You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/12/14 11:27:09 UTC

[shardingsphere] branch master updated: lazy initialization DataConsistencyChecker (#8612)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b4fc6ae  lazy initialization DataConsistencyChecker (#8612)
b4fc6ae is described below

commit b4fc6ae3b6663d918401708b3f86e7c5e4d86146
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Dec 14 19:26:41 2020 +0800

    lazy initialization DataConsistencyChecker (#8612)
    
    * delay init DataConsistencyChecker.
    
    * Optimize name and javadoc.
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/web/HttpServerHandler.java             |  6 +--
 .../core/execute/engine/TaskExecuteEngine.java     | 14 +++----
 .../scaling/core/job/ScalingJob.java               |  6 +--
 .../job/check/DataConsistencyCheckerFactory.java   |  5 +--
 .../core/job/preparer/ScalingJobPreparer.java      | 28 +++++--------
 .../core/service/AbstractScalingJobService.java    |  4 +-
 .../scaling/core/service/ScalingJobService.java    |  6 +--
 .../core/execute/engine/TaskExecuteEngineTest.java |  6 +--
 .../check/AbstractDataConsistencyCheckerTest.java  |  6 +--
 .../resumer/ScalingPositionResumerTest.java        |  4 +-
 .../impl/DistributedScalingJobServiceTest.java     | 33 ++++-----------
 .../impl/StandaloneScalingJobServiceTest.java      | 47 ++++++++--------------
 .../core/util/ScalingConfigurationUtil.java        |  2 +-
 13 files changed, 63 insertions(+), 104 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 4e58df8..27b7af3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -84,9 +84,9 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
     }
     
     private void startJob(final ChannelHandlerContext context, final String requestBody) {
-        Optional<ScalingJob> shardingScalingJob = SCALING_JOB_SERVICE.start(GSON.fromJson(requestBody, ScalingConfiguration.class));
-        Preconditions.checkState(shardingScalingJob.isPresent());
-        response(ResponseContentUtil.build(shardingScalingJob.get()), context, HttpResponseStatus.OK);
+        Optional<ScalingJob> scalingJob = SCALING_JOB_SERVICE.start(GSON.fromJson(requestBody, ScalingConfiguration.class));
+        Preconditions.checkState(scalingJob.isPresent());
+        response(ResponseContentUtil.build(scalingJob.get()), context, HttpResponseStatus.OK);
     }
     
     private void listJobs(final ChannelHandlerContext context) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java
index 922224e..1ccc068 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngine.java
@@ -64,7 +64,7 @@ public final class TaskExecuteEngine {
     }
     
     /**
-     * Submit a {@code ShardingScalingExecutor} without callback to execute.
+     * Submit a {@code ScalingExecutor} without callback to execute.
      *
      * @param scalingExecutor scaling executor
      * @return execute future
@@ -74,7 +74,7 @@ public final class TaskExecuteEngine {
     }
     
     /**
-     * Submit a {@code ShardingScalingExecutor} with callback {@code ExecuteCallback} to execute.
+     * Submit a {@code ScalingExecutor} with callback {@code ExecuteCallback} to execute.
      *
      * @param scalingExecutor scaling executor
      * @param executeCallback execute callback
@@ -87,15 +87,15 @@ public final class TaskExecuteEngine {
     }
     
     /**
-     * Submit a collection of {@code ShardingScalingExecutor} with callback {@code ExecuteCallback} to execute.
+     * Submit a collection of {@code ScalingExecutor} with callback {@code ExecuteCallback} to execute.
      *
-     * @param shardingScalingExecutors scaling executor
+     * @param scalingExecutors scaling executor
      * @param executeCallback execute callback
      * @return execute future of all
      */
-    public Future<?> submitAll(final Collection<? extends ScalingExecutor> shardingScalingExecutors, final ExecuteCallback executeCallback) {
-        Collection<ListenableFuture<?>> listenableFutures = new ArrayList<>(shardingScalingExecutors.size());
-        for (ScalingExecutor each : shardingScalingExecutors) {
+    public Future<?> submitAll(final Collection<? extends ScalingExecutor> scalingExecutors, final ExecuteCallback executeCallback) {
+        Collection<ListenableFuture<?>> listenableFutures = new ArrayList<>(scalingExecutors.size());
+        for (ScalingExecutor each : scalingExecutors) {
             ListenableFuture<?> listenableFuture = executorService.submit(each);
             listenableFutures.add(listenableFuture);
         }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index f6d2eed..0ec7f83 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -21,7 +21,6 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
 import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
@@ -44,6 +43,8 @@ public final class ScalingJob {
     
     private int shardingItem;
     
+    private String databaseType;
+    
     private final transient List<TaskConfiguration> taskConfigs = new LinkedList<>();
     
     private final transient List<ScalingTask> inventoryTasks = new LinkedList<>();
@@ -52,8 +53,6 @@ public final class ScalingJob {
     
     private transient ScalingConfiguration scalingConfig;
     
-    private transient DataConsistencyChecker dataConsistencyChecker;
-    
     private String status = JobStatus.RUNNING.name();
     
     public ScalingJob() {
@@ -69,6 +68,7 @@ public final class ScalingJob {
         this.scalingConfig = scalingConfig;
         shardingItem = scalingConfig.getJobConfiguration().getShardingItem();
         taskConfigs.addAll(TaskConfigurationUtil.toTaskConfigs(scalingConfig));
+        databaseType = taskConfigs.get(0).getDumperConfig().getDataSourceConfig().getDatabaseType().getName();
     }
     
     private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
index 78e8304..81ec835 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
@@ -30,13 +30,12 @@ public final class DataConsistencyCheckerFactory {
     /**
      * create data consistency checker instance.
      *
-     * @param databaseType database type
      * @param scalingJob scaling job
      * @return data consistency checker
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static DataConsistencyChecker newInstance(final String databaseType, final ScalingJob scalingJob) {
-        ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
+    public static DataConsistencyChecker newInstance(final ScalingJob scalingJob) {
+        ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(scalingJob.getDatabaseType());
         return scalingEntry.getDataConsistencyCheckerClass().getConstructor(ScalingJob.class).newInstance(scalingJob);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 80d9eba..3a53d8f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -24,8 +24,6 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
@@ -60,26 +58,24 @@ public final class ScalingJobPreparer {
      * @param scalingJob scaling job
      */
     public void prepare(final ScalingJob scalingJob) {
-        String databaseType = scalingJob.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig().getDatabaseType().getName();
         try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
-            checkDataSources(databaseType, dataSourceManager);
-            ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(databaseType, scalingJob);
+            checkDataSources(scalingJob.getDatabaseType(), dataSourceManager);
+            ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(scalingJob);
             if (resumeBreakPointManager.isResumable()) {
                 scalingPositionResumer.resumePosition(scalingJob, dataSourceManager, resumeBreakPointManager);
             } else {
-                initIncrementalTasks(databaseType, scalingJob, dataSourceManager);
-                initInventoryTasks(databaseType, scalingJob, dataSourceManager);
+                initIncrementalTasks(scalingJob, dataSourceManager);
+                initInventoryTasks(scalingJob, dataSourceManager);
                 scalingPositionResumer.persistPosition(scalingJob, resumeBreakPointManager);
             }
-            scalingJob.setDataConsistencyChecker(initDataConsistencyChecker(databaseType, scalingJob));
         } catch (final PrepareFailedException ex) {
             log.error("Preparing scaling job {} failed", scalingJob.getJobId(), ex);
             scalingJob.setStatus(JobStatus.PREPARING_FAILURE.name());
         }
     }
     
-    private ResumeBreakPointManager getResumeBreakPointManager(final String databaseType, final ScalingJob scalingJob) {
-        return ResumeBreakPointManagerFactory.newInstance(databaseType,
+    private ResumeBreakPointManager getResumeBreakPointManager(final ScalingJob scalingJob) {
+        return ResumeBreakPointManagerFactory.newInstance(scalingJob.getDatabaseType(),
                 ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), ScalingConstant.POSITION, scalingJob.getShardingItem()));
     }
     
@@ -90,23 +86,19 @@ public final class ScalingJobPreparer {
         dataSourceChecker.checkVariable(dataSourceManager.getSourceDataSources().values());
     }
     
-    private void initInventoryTasks(final String databaseType, final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+    private void initInventoryTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
         List<ScalingTask> allInventoryTasks = new LinkedList<>();
         for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
-            allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(databaseType, each, dataSourceManager));
+            allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(scalingJob.getDatabaseType(), each, dataSourceManager));
         }
         scalingJob.getInventoryTasks().addAll(allInventoryTasks);
     }
     
-    private void initIncrementalTasks(final String databaseType, final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+    private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
         for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
             DataSourceConfiguration dataSourceConfig = each.getDumperConfig().getDataSourceConfig();
-            each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(databaseType, dataSourceManager.getDataSource(dataSourceConfig)));
+            each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(scalingJob.getDatabaseType(), dataSourceManager.getDataSource(dataSourceConfig)));
             scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
         }
     }
-    
-    private DataConsistencyChecker initDataConsistencyChecker(final String databaseType, final ScalingJob scalingJob) {
-        return DataConsistencyCheckerFactory.newInstance(databaseType, scalingJob);
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 93f65f7..81c22c9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
+import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
 import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.Map;
@@ -46,7 +47,6 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
     public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
         Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
         if (!result.isPresent()) {
-            scalingCallback.onSuccess();
             return result;
         }
         FINISH_CHECK_EXECUTOR.scheduleWithFixedDelay(new JobFinishChecker(result.get(), scalingCallback), 3, 1, TimeUnit.MINUTES);
@@ -73,7 +73,7 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
      * @return data consistency check result
      */
     protected Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final ScalingJob scalingJob) {
-        DataConsistencyChecker dataConsistencyChecker = scalingJob.getDataConsistencyChecker();
+        DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(scalingJob);
         Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
         if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
             Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 87aaa3a..0639458 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -32,7 +32,7 @@ import java.util.Optional;
 public interface ScalingJobService {
     
     /**
-     * Get {@code ShardingScalingJob} list.
+     * Get {@code ScalingJob} list.
      *
      * @return scaling job service list
      */
@@ -66,10 +66,10 @@ public interface ScalingJobService {
     void stop(long jobId);
     
     /**
-     * Get {@code ShardingScalingJob} by id.
+     * Get {@code ScalingJob} by id.
      *
      * @param jobId job id
-     * @return {@code ShardingScalingJob} instance
+     * @return {@code ScalingJob} instance
      */
     ScalingJob getJob(long jobId);
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java
index da891f7..056ec58 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/TaskExecuteEngineTest.java
@@ -34,7 +34,7 @@ public final class TaskExecuteEngineTest {
         TaskExecuteEngine executeEngine = TaskExecuteEngine.newFixedThreadInstance(2);
         try {
             for (int i = 0; i < 5; i++) {
-                Future<?> submit = executeEngine.submit(mockShardingScalingExecutor());
+                Future<?> submit = executeEngine.submit(mockScalingExecutor());
                 assertFalse(submit.isCancelled());
             }
         } catch (final RejectedExecutionException ex) {
@@ -47,7 +47,7 @@ public final class TaskExecuteEngineTest {
         TaskExecuteEngine executeEngine = TaskExecuteEngine.newFixedThreadInstance(2);
         try {
             for (int i = 0; i < 5; i++) {
-                Future<?> submit = executeEngine.submitAll(Collections.singletonList(mockShardingScalingExecutor()), mockExecuteCallback());
+                Future<?> submit = executeEngine.submitAll(Collections.singletonList(mockScalingExecutor()), mockExecuteCallback());
                 assertFalse(submit.isCancelled());
             }
         } catch (final RejectedExecutionException ex) {
@@ -69,7 +69,7 @@ public final class TaskExecuteEngineTest {
         };
     }
     
-    private ScalingExecutor mockShardingScalingExecutor() {
+    private ScalingExecutor mockScalingExecutor() {
         return new ScalingExecutor() {
             @Override
             public void run() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
index 3fe4084..8593a1a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
@@ -39,8 +39,8 @@ public final class AbstractDataConsistencyCheckerTest {
     
     @Test
     public void assertCountCheck() {
-        ScalingJob scalingJob = mockShardingScalingJob();
-        DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance("H2", scalingJob);
+        ScalingJob scalingJob = mockScalingJob();
+        DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(scalingJob);
         initTableData(scalingJob.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig());
         initTableData(scalingJob.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig());
         Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.countCheck();
@@ -60,7 +60,7 @@ public final class AbstractDataConsistencyCheckerTest {
     }
     
     @SneakyThrows(IOException.class)
-    private ScalingJob mockShardingScalingJob() {
+    private ScalingJob mockScalingJob() {
         return ScalingConfigurationUtil.initJob("/config.json");
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
index 1b0b9d3..60ee900 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
@@ -48,7 +48,7 @@ public final class ScalingPositionResumerTest {
     @Before
     public void setUp() {
         ScalingContext.getInstance().init(new ServerConfiguration());
-        scalingJob = mockShardingScalingJob();
+        scalingJob = mockScalingJob();
         scalingPositionResumer = new ScalingPositionResumer();
     }
     
@@ -71,7 +71,7 @@ public final class ScalingPositionResumerTest {
     }
     
     @SneakyThrows(IOException.class)
-    private ScalingJob mockShardingScalingJob() {
+    private ScalingJob mockScalingJob() {
         return ScalingConfigurationUtil.initJob("/config.json");
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 345fe89..62258f6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -42,7 +42,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
@@ -73,19 +72,17 @@ public final class DistributedScalingJobServiceTest {
     
     @Test
     public void assertStartWithScalingConfig() {
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
-        assertTrue(shardingScalingJob.isPresent());
-        assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(shardingScalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
+        Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJob.isPresent());
+        assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
     }
     
     @Test
     public void assertStartWithCallbackImmediately() {
         ScalingConfiguration scalingConfig = mockScalingConfiguration();
         ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
-        AtomicBoolean successCallback = new AtomicBoolean();
-        Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback(successCallback));
+        Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback());
         assertFalse(scalingJob.isPresent());
-        assertTrue(successCallback.get());
     }
     
     @Test
@@ -99,10 +96,10 @@ public final class DistributedScalingJobServiceTest {
     
     @Test
     public void assertStop() {
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
-        assertTrue(shardingScalingJob.isPresent());
-        scalingJobService.stop(shardingScalingJob.get().getJobId());
-        assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(shardingScalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
+        Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJob.isPresent());
+        scalingJobService.stop(scalingJob.get().getJobId());
+        assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
     }
     
     @Test
@@ -165,20 +162,6 @@ public final class DistributedScalingJobServiceTest {
         };
     }
     
-    private ScalingCallback mockScalingCallback(final AtomicBoolean successCallback) {
-        return new ScalingCallback() {
-            
-            @Override
-            public void onSuccess() {
-                successCallback.set(true);
-            }
-            
-            @Override
-            public void onFailure() {
-            }
-        };
-    }
-    
     @SneakyThrows(ReflectiveOperationException.class)
     private void resetRegistryRepositoryAvailable() {
         ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index df60556..cbdbd41 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.service.impl;
 
-import com.google.common.collect.Maps;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
@@ -27,7 +26,6 @@ import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.resume.FakeResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.position.resume.IncrementalPositionResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
@@ -60,9 +58,9 @@ public final class StandaloneScalingJobServiceTest {
     
     @Test
     public void assertStartJob() {
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
-        assertTrue(shardingScalingJob.isPresent());
-        long jobId = shardingScalingJob.get().getJobId();
+        Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJob.isPresent());
+        long jobId = scalingJob.get().getJobId();
         JobProgress progress = scalingJobService.getProgress(jobId);
         assertThat(progress.getIncrementalTaskProgress().size(), is(1));
         assertThat(progress.getInventoryTaskProgress().size(), is(1));
@@ -70,9 +68,9 @@ public final class StandaloneScalingJobServiceTest {
     
     @Test
     public void assertStopExistJob() {
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
-        assertTrue(shardingScalingJob.isPresent());
-        long jobId = shardingScalingJob.get().getJobId();
+        Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJob.isPresent());
+        long jobId = scalingJob.get().getJobId();
         scalingJobService.stop(jobId);
         JobProgress progress = scalingJobService.getProgress(jobId);
         assertThat(progress.getStatus(), is(JobStatus.STOPPED.name()));
@@ -94,9 +92,9 @@ public final class StandaloneScalingJobServiceTest {
     @Test
     public void assertIncrementalTasksOnly() throws NoSuchFieldException, IllegalAccessException {
         ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", IncrementalPositionResumeBreakPointManager.class);
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
-        assertTrue(shardingScalingJob.isPresent());
-        long jobId = shardingScalingJob.get().getJobId();
+        Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJob.isPresent());
+        long jobId = scalingJob.get().getJobId();
         JobProgress progress = scalingJobService.getProgress(jobId);
         assertThat(progress.getIncrementalTaskProgress().size(), is(1));
         assertThat(progress.getInventoryTaskProgress().size(), is(1));
@@ -105,26 +103,13 @@ public final class StandaloneScalingJobServiceTest {
     
     @Test
     public void assertCheckExistJob() {
-        Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
-        assertTrue(shardingScalingJob.isPresent());
-        shardingScalingJob.get().setDataConsistencyChecker(new DataConsistencyChecker() {
-            @Override
-            public Map<String, DataConsistencyCheckResult> countCheck() {
-                Map<String, DataConsistencyCheckResult> result = Maps.newHashMapWithExpectedSize(1);
-                result.put("t1", new DataConsistencyCheckResult(1, 1));
-                return result;
-            }
-            
-            @Override
-            public Map<String, Boolean> dataCheck() {
-                Map<String, Boolean> result = Maps.newHashMapWithExpectedSize(1);
-                result.put("t1", true);
-                return result;
-            }
-        });
-        Map<String, DataConsistencyCheckResult> checkResult = scalingJobService.check(shardingScalingJob.get().getJobId());
-        assertTrue(checkResult.get("t1").isCountValid());
-        assertTrue(checkResult.get("t1").isDataValid());
+        Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJobOptional.isPresent());
+        ScalingJob scalingJob = scalingJobOptional.get();
+        scalingJob.setDatabaseType("H2");
+        scalingJob.getTaskConfigs().clear();
+        Map<String, DataConsistencyCheckResult> checkResult = scalingJobService.check(scalingJob.getJobId());
+        assertTrue(checkResult.isEmpty());
     }
     
     @Test(expected = ScalingJobNotFoundException.class)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
index 6b0d13c..6d36e1c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
@@ -53,7 +53,7 @@ public final class ScalingConfigurationUtil {
      * Init job from config file.
      *
      * @param configFile config file
-     * @return ShardingScalingJob
+     * @return scaling job
      * @throws IOException IO exception
      */
     public static ScalingJob initJob(final String configFile) throws IOException {