You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/12/04 10:23:57 UTC

[shardingsphere] branch master updated: DistributedScalingJobService add callback function (#8504)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa295fc  DistributedScalingJobService add callback function (#8504)
fa295fc is described below

commit fa295fc35bcfa63085bae74266dea315f1dec014
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Dec 4 18:23:10 2020 +0800

    DistributedScalingJobService add callback function (#8504)
    
    * DistributedScalingJobService add callback function
    
    * Update governance dependency
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../shardingsphere-scaling-core/pom.xml            | 11 ++-
 .../scaling/core/config/JobConfiguration.java      |  4 +-
 .../core/service/AbstractScalingJobService.java    | 67 ++++++++++++-----
 .../ScalingCallback.java}                          | 31 +++-----
 .../scaling/core/service/ScalingJobService.java    | 21 ++----
 .../service/impl/DistributedScalingJobService.java | 13 +++-
 .../scaling/core/utils/ProxyConfigurationUtil.java | 86 ----------------------
 .../scaling/core/utils/ScalingTaskUtil.java        | 21 +++++-
 .../scaling/core/utils/TaskConfigurationUtil.java  |  2 +-
 .../impl/DistributedScalingJobServiceTest.java     | 63 +++++++++++-----
 .../config_sharding_sphere_jdbc_target.json        | 38 ++++++++++
 .../test/resources/proxy_config-sharding_1.yaml    | 64 ----------------
 .../test/resources/proxy_config-sharding_2.yaml    | 84 ---------------------
 13 files changed, 190 insertions(+), 315 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index 19911d2..90dbe30 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -41,6 +41,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-governance-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-governance-repository-zookeeper-curator</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -64,11 +69,5 @@
             <artifactId>h2</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-proxy-common</artifactId>
-            <version>5.0.0-RC1-SNAPSHOT</version>
-            <scope>compile</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index cb6bc0d..6ebaae4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -37,7 +37,9 @@ public final class JobConfiguration {
     
     private int shardingItem;
     
-    private int shardingSize = 10000000;
+    private int shardingSize = 1000 * 10000;
     
     private boolean running = true;
+    
+    private long allowDelay = 60 * 1000L;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index f9a2a20..93f65f7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -17,39 +17,47 @@
 
 package org.apache.shardingsphere.scaling.core.service;
 
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.utils.ProxyConfigurationUtil;
-import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Abstract scaling job service.
  */
 public abstract class AbstractScalingJobService implements ScalingJobService {
     
-    @Override
-    public boolean shouldScaling(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
-        ScalingConfiguration scalingConfig = ProxyConfigurationUtil.toScalingConfig(oldYamlProxyConfig, newYamlProxyConfig);
-        TaskConfigurationUtil.fillInShardingTables(scalingConfig);
-        return shouldScaling(scalingConfig);
-    }
-    
-    private boolean shouldScaling(final ScalingConfiguration scalingConfig) {
-        return scalingConfig.getJobConfiguration().getShardingTables().length > 0;
-    }
+    private static final ScheduledExecutorService FINISH_CHECK_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("Scaling-finish-check-%d"));
     
     @Override
-    public Optional<ScalingJob> start(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
-        ScalingConfiguration scalingConfig = ProxyConfigurationUtil.toScalingConfig(oldYamlProxyConfig, newYamlProxyConfig);
-        TaskConfigurationUtil.fillInShardingTables(scalingConfig);
-        if (!shouldScaling(scalingConfig)) {
-            return Optional.empty();
+    public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
+        Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
+        if (!result.isPresent()) {
+            scalingCallback.onSuccess();
+            return result;
         }
+        FINISH_CHECK_EXECUTOR.scheduleWithFixedDelay(new JobFinishChecker(result.get(), scalingCallback), 3, 1, TimeUnit.MINUTES);
+        return result;
+    }
+    
+    private Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule) {
+        ScalingConfiguration scalingConfig = new ScalingConfiguration();
+        scalingConfig.setRuleConfiguration(
+                new RuleConfiguration(new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule), new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
+        scalingConfig.setJobConfiguration(new JobConfiguration());
         return start(scalingConfig);
     }
     
@@ -73,4 +81,29 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
         }
         return result;
     }
+    
+    @RequiredArgsConstructor
+    private class JobFinishChecker implements Runnable {
+        
+        private final ScalingJob scalingJob;
+        
+        private final ScalingCallback scalingCallback;
+        
+        private boolean finished;
+        
+        @Override
+        public void run() {
+            if (finished) {
+                return;
+            }
+            JobProgress jobProgress = getProgress(scalingJob.getJobId());
+            if (jobProgress.getStatus().contains("FAILURE")) {
+                finished = true;
+                scalingCallback.onFailure();
+            } else if (ScalingTaskUtil.allTasksAlmostFinished(jobProgress, scalingJob.getScalingConfig().getJobConfiguration())) {
+                finished = true;
+                scalingCallback.onSuccess();
+            }
+        }
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
similarity index 65%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
index cb6bc0d..b20eba2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
@@ -15,29 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
-
-import lombok.Getter;
-import lombok.Setter;
+package org.apache.shardingsphere.scaling.core.service;
 
 /**
- * Job configuration.
+ * Scaling callback.
  */
-@Setter
-@Getter
-public final class JobConfiguration {
-    
-    private Long jobId;
-    
-    private int concurrency = 3;
-    
-    private int retryTimes = 3;
-    
-    private String[] shardingTables;
-    
-    private int shardingItem;
+public interface ScalingCallback {
     
-    private int shardingSize = 10000000;
+    /**
+     * Callback when execute success.
+     */
+    void onSuccess();
     
-    private boolean running = true;
+    /**
+     * Callback when execute failure.
+     */
+    void onFailure();
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 9002a3d..87aaa3a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -39,15 +39,6 @@ public interface ScalingJobService {
     List<ScalingJob> listJobs();
     
     /**
-     * Check new yaml proxy configuration if should scaling.
-     *
-     * @param oldYamlProxyConfig old yaml proxy configuration
-     * @param newYamlProxyConfig new yaml proxy configuration
-     * @return if should scaling
-     */
-    boolean shouldScaling(String oldYamlProxyConfig, String newYamlProxyConfig);
-    
-    /**
      * Start scaling job.
      *
      * @param scalingConfig scaling job configuration
@@ -55,15 +46,17 @@ public interface ScalingJobService {
      */
     Optional<ScalingJob> start(ScalingConfiguration scalingConfig);
     
-    
     /**
-     * Start scaling job if it should scaling.
+     * Start scaling job.
      *
-     * @param oldYamlProxyConfig old yaml proxy configuration
-     * @param newYamlProxyConfig new yaml proxy configuration
+     * @param sourceDataSource source data source
+     * @param sourceRule source rule
+     * @param targetDataSource target data source
+     * @param targetRule target rule
+     * @param scalingCallback scaling callback
      * @return scaling job
      */
-    Optional<ScalingJob> start(String oldYamlProxyConfig, String newYamlProxyConfig);
+    Optional<ScalingJob> start(String sourceDataSource, String sourceRule, String targetDataSource, String targetRule, ScalingCallback scalingCallback);
     
     /**
      * Stop a job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 0b06059..069a044 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -57,10 +57,17 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
     
     @Override
     public Optional<ScalingJob> start(final ScalingConfiguration scalingConfig) {
-        ScalingJob scalingJob = new ScalingJob();
         TaskConfigurationUtil.fillInShardingTables(scalingConfig);
-        updateScalingConfig(scalingJob.getJobId(), scalingConfig);
-        return Optional.of(scalingJob);
+        if (shouldScaling(scalingConfig)) {
+            ScalingJob scalingJob = new ScalingJob();
+            updateScalingConfig(scalingJob.getJobId(), scalingConfig);
+            return Optional.of(scalingJob);
+        }
+        return Optional.empty();
+    }
+    
+    private boolean shouldScaling(final ScalingConfiguration scalingConfig) {
+        return scalingConfig.getJobConfiguration().getShardingTables().length > 0;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ProxyConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ProxyConfigurationUtil.java
deleted file mode 100644
index 4a9e7d5..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ProxyConfigurationUtil.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.utils;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfiguration;
-import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.proxy.config.yaml.YamlDataSourceParameter;
-import org.apache.shardingsphere.proxy.config.yaml.YamlProxyRuleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-/**
- * Proxy configuration Util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ProxyConfigurationUtil {
-    
-    /**
-     * Yaml proxy configuration transform to {@code ScalingConfiguration}.
-     *
-     * @param oldYamlProxyConfig old yaml proxy configuration
-     * @param newYamlProxyConfig new yaml proxy configuration
-     * @return {@code ScalingConfiguration} instance
-     */
-    public static ScalingConfiguration toScalingConfig(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
-        ScalingConfiguration result = new ScalingConfiguration();
-        result.setRuleConfiguration(new RuleConfiguration(toDataSourceConfig(oldYamlProxyConfig), toDataSourceConfig(newYamlProxyConfig)));
-        result.setJobConfiguration(new JobConfiguration());
-        return result;
-    }
-    
-    private static DataSourceConfiguration toDataSourceConfig(final String yamlProxyConfig) {
-        YamlProxyRuleConfiguration proxyRuleConfig = YamlEngine.unmarshal(yamlProxyConfig, YamlProxyRuleConfiguration.class);
-        return new ShardingSphereJDBCDataSourceConfiguration(getDataSourceConfig(proxyRuleConfig), getRuleConfig(proxyRuleConfig));
-    }
-    
-    private static String getDataSourceConfig(final YamlProxyRuleConfiguration proxyRuleConfig) {
-        YamlDataSourceConfigurationWrap result = new YamlDataSourceConfigurationWrap();
-        Map<String, YamlDataSourceConfiguration> dataSources = proxyRuleConfig.getDataSources().entrySet().stream()
-                .collect(Collectors.toMap(Entry::getKey, entry -> toYamlDataSourceConfig(proxyRuleConfig, entry.getValue())));
-        result.setDataSources(dataSources);
-        return YamlEngine.marshal(result);
-    }
-    
-    @SneakyThrows(IllegalAccessException.class)
-    private static YamlDataSourceConfiguration toYamlDataSourceConfig(final YamlProxyRuleConfiguration proxyRuleConfig, final YamlDataSourceParameter yamlDataSourceParameter) {
-        YamlDataSourceConfiguration result = new YamlDataSourceConfiguration();
-        result.setDataSourceClassName("com.zaxxer.hikari.HikariDataSource");
-        Map<String, Object> props = new HashMap<>(ReflectionUtil.getFieldMap(yamlDataSourceParameter));
-        result.setProps(props);
-        return result;
-    }
-    
-    private static String getRuleConfig(final YamlProxyRuleConfiguration proxyRuleConfig) {
-        YamlProxyRuleConfiguration result = new YamlProxyRuleConfiguration();
-        result.setRules(proxyRuleConfig.getRules());
-        return YamlEngine.marshal(result);
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index cc55a09..c62f2de 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -17,12 +17,16 @@
 
 package org.apache.shardingsphere.scaling.core.utils;
 
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * Scaling task util.
@@ -35,11 +39,24 @@ public final class ScalingTaskUtil {
      * @param inventoryTasks to check inventory tasks
      * @return is finished
      */
-    public static boolean allInventoryTasksFinished(final List<ScalingTask> inventoryTasks) {
+    public static boolean allInventoryTasksFinished(final Collection<ScalingTask> inventoryTasks) {
         return inventoryTasks.stream().allMatch(each -> ((InventoryTask) each).getPositionManager().getPosition() instanceof FinishedPosition);
     }
     
     /**
+     * All inventory tasks is finished and all Incremental tasks delay less than allow value.
+     *
+     * @param jobProgress job pProgress
+     * @param jobConfig job configuration
+     * @return almost finished or not
+     */
+    public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
+        return jobProgress.getInventoryTaskProgress().values().stream().flatMap(Collection::stream).allMatch(each -> ((InventoryTaskProgress) each).isFinished())
+                && jobProgress.getIncrementalTaskProgress().values().stream().flatMap(Collection::stream)
+                .allMatch(each -> ((IncrementalTaskProgress) each).getDelayMillisecond() < jobConfig.getAllowDelay());
+    }
+    
+    /**
      * Get scaling listener path.
      *
      * @param paths sub paths.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
index 8009863..9d99fa9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
@@ -237,10 +237,10 @@ public final class TaskConfigurationUtil {
         }
         ShardingSphereJDBCDataSourceConfiguration target =
                 (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
-        List<String> result = new ArrayList<>();
         Set<String> modifiedDataSources = getModifiedDataSources(source.getDataSource(), target.getDataSource());
         Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap = getShardingRuleConfigMap(source.getRule());
         Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap = getShardingRuleConfigMap(target.getRule());
+        List<String> result = new ArrayList<>();
         newShardingRuleConfigMap.keySet().forEach(each -> {
             if (!oldShardingRuleConfigMap.containsKey(each)) {
                 return;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 230de79..345fe89 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -24,12 +24,14 @@ import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
+import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
 import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
@@ -40,6 +42,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
@@ -69,15 +72,6 @@ public final class DistributedScalingJobServiceTest {
     }
     
     @Test
-    @SneakyThrows(IOException.class)
-    public void assertShouldScaling() {
-        String oldConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_1.yaml");
-        String newConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_2.yaml");
-        assertFalse(scalingJobService.shouldScaling(oldConfig, oldConfig));
-        assertTrue(scalingJobService.shouldScaling(oldConfig, newConfig));
-    }
-    
-    @Test
     public void assertStartWithScalingConfig() {
         Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
         assertTrue(shardingScalingJob.isPresent());
@@ -85,14 +79,22 @@ public final class DistributedScalingJobServiceTest {
     }
     
     @Test
-    @SneakyThrows(IOException.class)
-    public void assertStartWithProxyConfig() {
-        String oldConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_1.yaml");
-        String newConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_2.yaml");
-        assertFalse(scalingJobService.start(oldConfig, oldConfig).isPresent());
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(oldConfig, newConfig);
-        assertTrue(shardingScalingJob.isPresent());
-        assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(shardingScalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
+    public void assertStartWithCallbackImmediately() {
+        ScalingConfiguration scalingConfig = mockScalingConfiguration();
+        ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
+        AtomicBoolean successCallback = new AtomicBoolean();
+        Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback(successCallback));
+        assertFalse(scalingJob.isPresent());
+        assertTrue(successCallback.get());
+    }
+    
+    @Test
+    public void assertStartWithCallbackSuccess() throws IOException {
+        ScalingConfiguration scalingConfig = ScalingConfigurationUtil.initConfig("/config_sharding_sphere_jdbc_target.json");
+        ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
+        ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+        Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), mockScalingCallback());
+        assertTrue(scalingJob.isPresent());
     }
     
     @Test
@@ -150,6 +152,33 @@ public final class DistributedScalingJobServiceTest {
         return result;
     }
     
+    private ScalingCallback mockScalingCallback() {
+        return new ScalingCallback() {
+            
+            @Override
+            public void onSuccess() {
+            }
+            
+            @Override
+            public void onFailure() {
+            }
+        };
+    }
+    
+    private ScalingCallback mockScalingCallback(final AtomicBoolean successCallback) {
+        return new ScalingCallback() {
+            
+            @Override
+            public void onSuccess() {
+                successCallback.set(true);
+            }
+            
+            @Override
+            public void onFailure() {
+            }
+        };
+    }
+    
     @SneakyThrows(ReflectiveOperationException.class)
     private void resetRegistryRepositoryAvailable() {
         ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
new file mode 100644
index 0000000..f659188
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "ruleConfiguration": {
+    "source": {
+      "type": "shardingSphereJdbc",
+      "parameter": {
+        "dataSource": "dataSources:\n ds_0:\n  dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n  props:\n    jdbcUrl: jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false\n    username: root\n    password: 'password'\n    connectionTimeout: 30000\n    idleTimeout: 60000\n    maxLifetime: 1800000\n    maxPoolSize: 50\n    minPoolSize: 1\n    maintenanceIntervalMilliseconds: 30000\n    readOnly: false\n",
+        "rule": "rules:\n- !SHARDING\n  defaultDatabaseStrategy:\n    standard:\n      shardingAlgorithmName: inline\n      shardingColumn: user_id\n  tables:\n    t1:\n      actualDataNodes: ds_0.t1\n      keyGenerateStrategy:\n        column: order_id\n      logicTable: t1\n      tableStrategy:\n        standard:\n          shardingAlgorithmName: inline\n          shardingColumn: order_id\n    t2:\n      actualDataNodes: ds_0.t2\n      keyGenerateStrategy:\n        column: order_item_i [...]
+      }
+    },
+    "target": {
+      "type": "shardingSphereJdbc",
+      "parameter": {
+        "dataSource": "dataSources:\n ds_0:\n  dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n  props:\n    jdbcUrl: jdbc:mysql://127.0.0.1:3306/demo_ds_1?serverTimezone=UTC&useSSL=false\n    username: root\n    password: 'password'\n    connectionTimeout: 30000\n    idleTimeout: 60000\n    maxLifetime: 1800000\n    maxPoolSize: 50\n    minPoolSize: 1\n    maintenanceIntervalMilliseconds: 30000\n    readOnly: false\n",
+        "rule": "rules:\n- !SHARDING\n  defaultDatabaseStrategy:\n    standard:\n      shardingAlgorithmName: inline\n      shardingColumn: user_id\n  tables:\n    t1:\n      actualDataNodes: ds_0.t1\n      keyGenerateStrategy:\n        column: order_id\n      logicTable: t1\n      tableStrategy:\n        standard:\n          shardingAlgorithmName: inline\n          shardingColumn: order_id\n    t2:\n      actualDataNodes: ds_0.t2\n      keyGenerateStrategy:\n        column: order_item_i [...]
+      }
+    }
+  },
+  "jobConfiguration": {
+    "concurrency": 3
+  }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_1.yaml b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_1.yaml
deleted file mode 100644
index 4a327d9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_1.yaml
+++ /dev/null
@@ -1,64 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-schemaName: sharding_db
-
-dataSources:
-  ds_0:
-    url: jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false
-    username: scaling
-    password: scaling
-    connectionTimeoutMilliseconds: 30000
-    idleTimeoutMilliseconds: 60000
-    maxLifetimeMilliseconds: 1800000
-    maxPoolSize: 50
-    minPoolSize: 1
-    maintenanceIntervalMilliseconds: 30000
-  ds_1:
-    url: jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false
-    username: scaling
-    password: scaling
-    connectionTimeoutMilliseconds: 30000
-    idleTimeoutMilliseconds: 60000
-    maxLifetimeMilliseconds: 1800000
-    maxPoolSize: 50
-    minPoolSize: 1
-    maintenanceIntervalMilliseconds: 30000
-    
-rules:
-  - !SHARDING
-    tables:
-      t_order:
-        actualDataNodes: ds_$->{0..1}.t_order_$->{0..1}
-        logicTable: t_order
-        databaseStrategy:
-          standard:
-            shardingColumn: id
-            shardingAlgorithmName: t_order_db_algorith
-        tableStrategy:
-          standard:
-            shardingColumn: user_id
-            shardingAlgorithmName: t_order_tbl_algorith
-    shardingAlgorithms:
-      t_order_db_algorith:
-        type: INLINE
-        props:
-          algorithm-expression: ds_$->{id % 2}
-      t_order_tbl_algorith:
-        type: INLINE
-        props:
-          algorithm-expression: t_order_$->{user_id % 2}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_2.yaml b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_2.yaml
deleted file mode 100644
index 2bc38f2..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_2.yaml
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-schemaName: sharding_db
-
-dataSources:
-  ds_0:
-    url: jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false
-    username: scaling
-    password: scaling
-    connectionTimeoutMilliseconds: 30000
-    idleTimeoutMilliseconds: 60000
-    maxLifetimeMilliseconds: 1800000
-    maxPoolSize: 50
-    minPoolSize: 1
-    maintenanceIntervalMilliseconds: 30000
-  ds_1:
-    url: jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false
-    username: scaling
-    password: scaling
-    connectionTimeoutMilliseconds: 30000
-    idleTimeoutMilliseconds: 60000
-    maxLifetimeMilliseconds: 1800000
-    maxPoolSize: 50
-    minPoolSize: 1
-    maintenanceIntervalMilliseconds: 30000
-  ds_2:
-    url: jdbc:mysql://192.168.0.3:3306/scaling?serverTimezone=UTC&useSSL=false
-    username: scaling
-    password: scaling
-    connectionTimeoutMilliseconds: 30000
-    idleTimeoutMilliseconds: 60000
-    maxLifetimeMilliseconds: 1800000
-    maxPoolSize: 50
-    minPoolSize: 1
-    maintenanceIntervalMilliseconds: 30000
-  ds_3:
-    url: jdbc:mysql://192.168.0.4:3306/scaling?serverTimezone=UTC&useSSL=false
-    username: scaling
-    password: scaling
-    connectionTimeoutMilliseconds: 30000
-    idleTimeoutMilliseconds: 60000
-    maxLifetimeMilliseconds: 1800000
-    maxPoolSize: 50
-    minPoolSize: 1
-    maintenanceIntervalMilliseconds: 30000
-    
-rules:
-  - !SHARDING
-    tables:
-      t_order:
-        actualDataNodes: ds_$->{0..3}.t_order_$->{0..3}
-        logicTable: t_order
-        databaseStrategy:
-          standard:
-            shardingColumn: id
-            shardingAlgorithmName: t_order_db_algorith
-        tableStrategy:
-          standard:
-            shardingColumn: user_id
-            shardingAlgorithmName: t_order_tbl_algorith
-    shardingAlgorithms:
-      t_order_db_algorith:
-        type: INLINE
-        props:
-          algorithm-expression: ds_$->{id % 4}
-      t_order_tbl_algorith:
-        type: INLINE
-        props:
-          algorithm-expression: t_order_$->{user_id % 4}