You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2020/12/25 09:00:30 UTC

[shardingsphere] branch master updated: Add ScalingEnvironmentalManager (#8775)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a98884  Add ScalingEnvironmentalManager (#8775)
1a98884 is described below

commit 1a988844c772a6f0c64180dd943828d9b99da001
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Dec 25 17:00:02 2020 +0800

    Add ScalingEnvironmentalManager (#8775)
    
    * Add ScalingEnvironmentalManager
    
    * Add unit test for StandaloneScalingJobService
    
    * move function.
    
    * add unit test for DistributedScalingJobService
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/web/HttpServerHandler.java             | 14 ++++++
 .../sqlbuilder/AbstractScalingSQLBuilder.java      |  5 ++
 .../executor/sqlbuilder/ScalingSQLBuilder.java     |  8 ++++
 .../job/check/DataConsistencyCheckerFactory.java   |  2 +-
 .../environmental/ScalingEnvironmentalManager.java | 55 ++++++++++++++++++++++
 .../resume/FileSystemResumeBreakPointManager.java  |  2 +-
 .../core/service/AbstractScalingJobService.java    | 26 +++++-----
 .../scaling/core/service/ScalingJobService.java    |  6 ++-
 .../service/impl/DistributedScalingJobService.java | 15 +++---
 .../service/impl/StandaloneScalingJobService.java  | 26 +++-------
 .../impl/DistributedScalingJobServiceTest.java     |  6 +++
 .../impl/StandaloneScalingJobServiceTest.java      | 55 ++++++++++++++++++----
 12 files changed, 165 insertions(+), 55 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 27b7af3..5e6ba31 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -41,6 +41,7 @@ import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobServiceFactory;
 import org.apache.shardingsphere.scaling.util.ResponseContentUtil;
 
+import java.sql.SQLException;
 import java.util.List;
 import java.util.Optional;
 
@@ -80,6 +81,10 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
             checkJob(context, requestPath);
             return;
         }
+        if (requestPath.contains("/scaling/job/reset/")) {
+            resetJob(context, requestPath);
+            return;
+        }
         response(ResponseContentUtil.handleBadRequest("Not support request!"), context, HttpResponseStatus.BAD_REQUEST);
     }
     
@@ -115,6 +120,15 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
         }
     }
     
+    private void resetJob(final ChannelHandlerContext context, final String requestPath) {
+        try {
+            SCALING_JOB_SERVICE.reset(getJobId(requestPath));
+            response(ResponseContentUtil.success(), context, HttpResponseStatus.OK);
+        } catch (final ScalingJobNotFoundException | SQLException ex) {
+            response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
+        }
+    }
+    
     private long getJobId(final String requestPath) {
         return Long.parseLong(requestPath.split("/")[4]);
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
index 6f9edb742..8c4db1f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
@@ -124,6 +124,11 @@ public abstract class AbstractScalingSQLBuilder implements ScalingSQLBuilder {
         return sqlCacheMap.get(sqlCacheKey);
     }
     
+    @Override
+    public String buildTruncateSQL(final String tableName) {
+        return String.format("TRUNCATE TABLE %s", quote(tableName));
+    }
+    
     private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
         return String.format("DELETE FROM %s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
index 09c98fa..4efe09b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
@@ -54,6 +54,14 @@ public interface ScalingSQLBuilder {
     String buildDeleteSQL(DataRecord dataRecord, Collection<Column> conditionColumns);
     
     /**
+     * Build truncate SQL.
+     *
+     * @param tableName table name
+     * @return truncate SQL
+     */
+    String buildTruncateSQL(String tableName);
+    
+    /**
      * Build count SQL.
      *
      * @param tableName table name
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
index 81ec835..6788b53 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/DataConsistencyCheckerFactory.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 public final class DataConsistencyCheckerFactory {
     
     /**
-     * create data consistency checker instance.
+     * Create data consistency checker instance.
      *
      * @param scalingJob scaling job
      * @return data consistency checker
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
new file mode 100644
index 0000000..42bf74f
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.environmental;
+
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
+import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilderFactory;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Scaling environmental manager.
+ */
+public final class ScalingEnvironmentalManager {
+    
+    private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
+    
+    /**
+     * Reset target table, Truncate target table.
+     *
+     * @param scalingJob scalingjob
+     * @throws SQLException SQL exception
+     */
+    public void resetTargetTable(final ScalingJob scalingJob) throws SQLException {
+        Set<String> tables = scalingJob.getTaskConfigs().stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet());
+        try (DataSourceWrapper dataSource = dataSourceFactory.newInstance(scalingJob.getScalingConfig().getRuleConfiguration().getTarget().unwrap());
+             Connection connection = dataSource.getConnection()) {
+            for (String each : tables) {
+                try (PreparedStatement preparedStatement = connection.prepareStatement(ScalingSQLBuilderFactory.newInstance(scalingJob.getDatabaseType()).buildTruncateSQL(each))) {
+                    preparedStatement.execute();
+                }
+            }
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
index 1daac29..284cbe2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
@@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets;
 public final class FileSystemResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
     
     public FileSystemResumeBreakPointManager(final String databaseType, final String taskPath) {
-        super(databaseType, taskPath.startsWith("/") ? ".scaling/" + taskPath : taskPath);
+        super(databaseType, taskPath.startsWith("/") ? ".scaling" + taskPath : taskPath);
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 81c22c9..d876b59 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -28,8 +28,10 @@ import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
+import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
 import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
+import java.sql.SQLException;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executors;
@@ -55,25 +57,16 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
     
     private Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule) {
         ScalingConfiguration scalingConfig = new ScalingConfiguration();
-        scalingConfig.setRuleConfiguration(
-                new RuleConfiguration(new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule), new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
+        scalingConfig.setRuleConfiguration(new RuleConfiguration(
+                new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule),
+                new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
         scalingConfig.setJobConfiguration(new JobConfiguration());
         return start(scalingConfig);
     }
     
     @Override
-    public void reset(final long jobId) {
-        // TODO reset target tables.
-    }
-    
-    /**
-     * Do data consistency check.
-     *
-     * @param scalingJob scaling job
-     * @return data consistency check result
-     */
-    protected Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final ScalingJob scalingJob) {
-        DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(scalingJob);
+    public Map<String, DataConsistencyCheckResult> check(final long jobId) {
+        DataConsistencyChecker dataConsistencyChecker = DataConsistencyCheckerFactory.newInstance(getJob(jobId));
         Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.countCheck();
         if (result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
             Map<String, Boolean> dataCheckResult = dataConsistencyChecker.dataCheck();
@@ -82,6 +75,11 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
         return result;
     }
     
+    @Override
+    public void reset(final long jobId) throws SQLException {
+        new ScalingEnvironmentalManager().resetTargetTable(getJob(jobId));
+    }
+    
     @RequiredArgsConstructor
     private class JobFinishChecker implements Runnable {
         
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 0639458..a488f46 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 
+import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -59,7 +60,7 @@ public interface ScalingJobService {
     Optional<ScalingJob> start(String sourceDataSource, String sourceRule, String targetDataSource, String targetRule, ScalingCallback scalingCallback);
     
     /**
-     * Stop a job.
+     * Stop job.
      *
      * @param jobId job id
      */
@@ -93,8 +94,9 @@ public interface ScalingJobService {
      * Reset target tables.
      *
      * @param jobId job id
+     * @throws SQLException SQL exception
      */
-    void reset(long jobId);
+    void reset(long jobId) throws SQLException;
     
     /**
      * remove job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 069a044..07d07d9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -17,16 +17,17 @@
 
 package org.apache.shardingsphere.scaling.core.service.impl;
 
+import com.google.common.base.Strings;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionGroup;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
@@ -37,7 +38,6 @@ import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -83,8 +83,12 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
     
     @Override
     public ScalingJob getJob(final long jobId) {
+        String data = REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.CONFIG));
+        if (Strings.isNullOrEmpty(data)) {
+            throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
+        }
         ScalingJob result = new ScalingJob(jobId);
-        result.setScalingConfig(GSON.fromJson(REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.CONFIG)), ScalingConfiguration.class));
+        result.setScalingConfig(GSON.fromJson(data, ScalingConfiguration.class));
         return result;
     }
     
@@ -117,11 +121,6 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> check(final long jobId) {
-        return dataConsistencyCheck(getJob(jobId));
-    }
-    
-    @Override
     public void remove(final long jobId) {
         REGISTRY_REPOSITORY.delete(ScalingTaskUtil.getScalingListenerPath(jobId));
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
index d70393d..9363a4a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
@@ -21,10 +21,9 @@ import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
-import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
+import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
 import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 
@@ -65,25 +64,22 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
     
     @Override
     public void stop(final long jobId) {
-        if (!scalingJobMap.containsKey(jobId)) {
-            throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
-        }
+        ScalingJob scalingJob = getJob(jobId);
         scalingTaskSchedulerMap.get(jobId).stop();
-        scalingJobMap.get(jobId).setStatus(JobStatus.STOPPED.name());
+        scalingJob.setStatus(JobStatus.STOPPED.name());
     }
     
     @Override
     public ScalingJob getJob(final long jobId) {
+        if (!scalingJobMap.containsKey(jobId)) {
+            throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
+        }
         return scalingJobMap.get(jobId);
     }
     
     @Override
     public JobProgress getProgress(final long jobId) {
-        if (!scalingJobMap.containsKey(jobId)) {
-            throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
-        }
-        ScalingJob scalingJob = scalingJobMap.get(jobId);
-        JobProgress result = new JobProgress(jobId, scalingJob.getStatus());
+        JobProgress result = new JobProgress(jobId, getJob(jobId).getStatus());
         if (scalingTaskSchedulerMap.containsKey(jobId)) {
             result.getInventoryTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getInventoryTaskProgress());
             result.getIncrementalTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getIncrementalTaskProgress());
@@ -92,14 +88,6 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> check(final long jobId) {
-        if (!scalingJobMap.containsKey(jobId)) {
-            throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
-        }
-        return dataConsistencyCheck(scalingJobMap.get(jobId));
-    }
-    
-    @Override
     public void remove(final long jobId) {
         stop(jobId);
         scalingJobMap.remove(jobId);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 62258f6..c886d3b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
@@ -102,6 +103,11 @@ public final class DistributedScalingJobServiceTest {
         assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
     }
     
+    @Test(expected = ScalingJobNotFoundException.class)
+    public void assertGetNotExistJob() {
+        scalingJobService.getJob(0);
+    }
+    
     @Test
     public void assertGetProgress() {
         registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/config"), "{'ruleConfiguration':{'source':{},'target':{}},'jobConfiguration':{'running':true}}");
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index 6b93618..f5e5c7c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -21,6 +21,8 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
 import org.apache.shardingsphere.scaling.core.fixture.FixtureResumeBreakPointManager;
@@ -41,7 +43,12 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import javax.sql.DataSource;
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Map;
 import java.util.Optional;
 
@@ -81,10 +88,15 @@ public final class StandaloneScalingJobServiceTest {
         assertThat(progress.getInventoryTaskProgress().size(), is(1));
     }
     
+    @Test(expected = ScalingJobNotFoundException.class)
+    public void assertGetNotExistJob() {
+        scalingJobService.getJob(0);
+    }
+    
     @Test
     @SuppressWarnings("unchecked")
     @SneakyThrows(ReflectiveOperationException.class)
-    public void assertStopExistJob() {
+    public void assertStopJob() {
         Map<Long, ScalingJob> scalingJobMap = ReflectionUtil.getFieldValue(scalingJobService, "scalingJobMap", Map.class);
         Map<Long, ScalingTaskScheduler> scalingTaskSchedulerMap = ReflectionUtil.getFieldValue(scalingJobService, "scalingTaskSchedulerMap", Map.class);
         assertNotNull(scalingJobMap);
@@ -96,11 +108,6 @@ public final class StandaloneScalingJobServiceTest {
         verify(scalingJob).setStatus(JobStatus.STOPPED.name());
     }
     
-    @Test(expected = ScalingJobNotFoundException.class)
-    public void assertStopNotExistJob() {
-        scalingJobService.stop(0);
-    }
-    
     @Test
     public void assertListJobs() {
         assertThat(scalingJobService.listJobs().size(), is(0));
@@ -109,7 +116,7 @@ public final class StandaloneScalingJobServiceTest {
     }
     
     @Test
-    public void assertCheckExistJob() {
+    public void assertCheckJob() {
         Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
         assertTrue(scalingJobOptional.isPresent());
         ScalingJob scalingJob = scalingJobOptional.get();
@@ -119,9 +126,17 @@ public final class StandaloneScalingJobServiceTest {
         assertTrue(checkResult.isEmpty());
     }
     
-    @Test(expected = ScalingJobNotFoundException.class)
-    public void assertCheckNotExistJob() {
-        scalingJobService.check(0);
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertResetJob() {
+        Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
+        assertTrue(scalingJobOptional.isPresent());
+        ScalingJob scalingJob = scalingJobOptional.get();
+        DataSourceConfiguration dataSourceConfig = scalingJob.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig();
+        initTableData(dataSourceConfig);
+        assertThat(countTableData(dataSourceConfig), is(2L));
+        scalingJobService.reset(scalingJob.getJobId());
+        assertThat(countTableData(dataSourceConfig), is(0L));
     }
     
     @SneakyThrows(IOException.class)
@@ -129,6 +144,26 @@ public final class StandaloneScalingJobServiceTest {
         return ScalingConfigurationUtil.initConfig("/config.json");
     }
     
+    private void initTableData(final DataSourceConfiguration dataSourceConfig) throws SQLException {
+        DataSource dataSource = new DataSourceManager().getDataSource(dataSourceConfig);
+        try (Connection connection = dataSource.getConnection();
+             Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS `t1`");
+            statement.execute("CREATE TABLE `t1` (id INT PRIMARY KEY, user_id VARCHAR(12))");
+            statement.execute("INSERT INTO `t1` (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+        }
+    }
+    
+    private long countTableData(final DataSourceConfiguration dataSourceConfig) throws SQLException {
+        DataSource dataSource = new DataSourceManager().getDataSource(dataSourceConfig);
+        try (Connection connection = dataSource.getConnection();
+             Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) FROM `t1`");
+            resultSet.next();
+            return resultSet.getLong(1);
+        }
+    }
+    
     @After
     @SneakyThrows(ReflectiveOperationException.class)
     public void tearDown() {