You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/28 03:02:04 UTC

[shardingsphere] branch master updated: Rewrite scaling position persist and resume (#9195)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cb82953  Rewrite scaling position persist and resume (#9195)
cb82953 is described below

commit cb82953e4d384907965198a7a42fcc8551d77905
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jan 28 11:01:26 2021 +0800

    Rewrite scaling position persist and resume (#9195)
    
    * Remove scaling position manager and resumer.
    
    * Rewrite scaling position persist and resume.
    
    * update exception message.
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/fixture/FixtureH2ScalingEntry.java     |   6 +-
 ...anager.java => FixturePositionInitializer.java} |  13 +-
 .../fixture/FixtureResumeBreakPointManager.java    |  38 -----
 .../scaling/web/HttpServerHandlerTest.java         |  11 --
 .../scaling/core/api/JobSchedulerCenter.java       |  79 +++++++++
 .../RegistryRepositoryAPI.java}                    |  44 +++--
 .../core/api/RegistryRepositoryAPIImpl.java        |  59 +++++++
 .../scaling/core/config/DumperConfiguration.java   |   4 +-
 .../scaling/core/config/JobConfiguration.java      |   2 +
 .../execute/executor/AbstractScalingExecutor.java  |   5 -
 .../executor/dumper/AbstractJDBCDumper.java        |   2 +-
 .../scaling/core/job/ScalingJob.java               |   6 +-
 .../scaling/core/job/position/JobPosition.java     | 166 +++++++++++++++++++
 ...sitionManager.java => PositionInitializer.java} |  29 ++--
 ...actory.java => PositionInitializerFactory.java} |  32 ++--
 .../resume/AbstractResumeBreakPointManager.java    | 157 ------------------
 .../resume/FileSystemResumeBreakPointManager.java  |  51 ------
 .../RegistryRepositoryResumeBreakPointManager.java |  43 -----
 .../position/resume/ResumeBreakPointManager.java   |  75 ---------
 .../resume/ResumeBreakPointManagerFactory.java     |  55 -------
 .../core/job/preparer/ScalingJobPreparer.java      |  50 +++---
 .../checker/AbstractDataSourceChecker.java         |   2 +-
 .../preparer/resumer/ScalingPositionResumer.java   | 116 -------------
 .../preparer/splitter/InventoryTaskSplitter.java   |  87 ++++++----
 .../scaling/core/job/task/ScalingTask.java         |   8 +-
 .../core/job/task/incremental/IncrementalTask.java |  20 ++-
 .../core/job/task/inventory/InventoryTask.java     |  18 +-
 .../core/schedule/ScalingTaskScheduler.java        |   1 -
 .../scaling/core/spi/ScalingEntry.java             |   8 +-
 .../scaling/core/utils/RdbmsConfigurationUtil.java |  12 +-
 .../scaling/core/utils/ReflectionUtil.java         |  16 ++
 .../core/utils/ScalingConfigurationUtil.java       | 181 +++++++++++++++++++++
 .../scaling/core/utils/ScalingTaskUtil.java        |   2 +-
 .../core/fixture/FixtureH2ScalingEntry.java        |   6 +-
 ...anager.java => FixturePositionInitializer.java} |  13 +-
 .../fixture/FixtureResumeBreakPointManager.java    |  38 -----
 .../AbstractResumeBreakPointManagerTest.java       | 108 ------------
 .../FileSystemResumeBreakPointManagerTest.java     |  54 ------
 ...istryRepositoryResumeBreakPointManagerTest.java |  68 --------
 .../resumer/ScalingPositionResumerTest.java        |  76 ---------
 .../splitter/InventoryTaskSplitterTest.java        |  32 ++--
 .../job/task/incremental/IncrementalTaskTest.java  |   7 +-
 .../core/job/task/inventory/InventoryTaskTest.java |   9 +-
 .../impl/StandaloneScalingJobServiceTest.java      |  11 --
 .../scaling/mysql/MySQLScalingEntry.java           |   8 +-
 ...nManager.java => MySQLPositionInitializer.java} |  36 +---
 .../scaling/mysql/MySQLScalingEntryTest.java       |   4 +-
 ...Test.java => MySQLPositionInitializerTest.java} |  25 +--
 .../scaling/postgresql/PostgreSQLScalingEntry.java |   8 +-
 ...ger.java => PostgreSQLPositionInitializer.java} |  31 +---
 .../postgresql/component/PostgreSQLWalDumper.java  |   2 +-
 .../postgresql/PostgreSQLScalingEntryTest.java     |   4 +-
 ...java => PostgreSQLPositionInitializerTest.java} |  25 +--
 .../component/PostgreSQLWalDumperTest.java         |   4 +-
 .../scaling/elasticjob/job/ScalingElasticJob.java  |   3 +
 55 files changed, 754 insertions(+), 1216 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index 478558b..99229ac 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
@@ -39,8 +39,8 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
-        return FixturePositionManager.class;
+    public Class<? extends PositionInitializer> getPositionInitializer() {
+        return FixturePositionInitializer.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionInitializer.java
similarity index 77%
rename from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionInitializer.java
index ac69629..201d333 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionInitializer.java
@@ -18,17 +18,14 @@
 package org.apache.shardingsphere.scaling.fixture;
 
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 
 import javax.sql.DataSource;
 
-public final class FixturePositionManager extends PositionManager {
+public final class FixturePositionInitializer implements PositionInitializer<PlaceholderPosition> {
     
-    public FixturePositionManager(final DataSource dataSource) {
-        super(new PlaceholderPosition());
-    }
-    
-    public FixturePositionManager(final String position) {
-        super(new PlaceholderPosition());
+    @Override
+    public PlaceholderPosition init(final DataSource dataSource) {
+        return new PlaceholderPosition();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureResumeBreakPointManager.java
deleted file mode 100644
index 2233312..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureResumeBreakPointManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.fixture;
-
-import org.apache.shardingsphere.scaling.core.job.position.resume.AbstractResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-
-public final class FixtureResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-    
-    public FixtureResumeBreakPointManager(final String databaseType, final String taskPath) {
-        super(databaseType, taskPath);
-    }
-    
-    @Override
-    public String getPosition(final String path) {
-        return null;
-    }
-    
-    @Override
-    public void persistPosition(final String path, final String data) {
-    
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index 60abb85..4f60b9e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -32,12 +32,8 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
-import org.apache.shardingsphere.scaling.core.job.position.resume.FileSystemResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
 import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.apache.shardingsphere.scaling.fixture.FixtureResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.util.ScalingConfigurationUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -67,7 +63,6 @@ public final class HttpServerHandlerTest {
     public void setUp() {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(TaskExecuteEngine.class));
-        ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FixtureResumeBreakPointManager.class);
         httpServerHandler = new HttpServerHandler();
     }
     
@@ -190,10 +185,4 @@ public final class HttpServerHandlerTest {
         JsonObject jsonObject = new Gson().fromJson(fullHttpResponse.content().toString(CharsetUtil.UTF_8), JsonObject.class);
         return jsonObject.get("model").getAsJsonObject().get("jobId").getAsLong();
     }
-    
-    @After
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void tearDown() {
-        ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FileSystemResumeBreakPointManager.class);
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobSchedulerCenter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobSchedulerCenter.java
new file mode 100644
index 0000000..9db9feb
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobSchedulerCenter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Job scheduler center.
+ */
+@Slf4j
+public final class JobSchedulerCenter {
+    
+    private static final Map<String, ScalingJob> SCALING_JOB_MAP = Maps.newConcurrentMap();
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
+    
+    private static final RegistryRepositoryAPI REGISTRY_REPOSITORY_API = new RegistryRepositoryAPIImpl();
+    
+    static {
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 1, 1, TimeUnit.MINUTES);
+    }
+    
+    /**
+     * Add job.
+     *
+     * @param scalingJob scheduler job
+     */
+    public static void addJob(final ScalingJob scalingJob) {
+        SCALING_JOB_MAP.put(String.format("%d-%d", scalingJob.getJobId(), scalingJob.getShardingItem()), scalingJob);
+    }
+    
+    /**
+     * Remove job.
+     *
+     * @param scalingJob scheduler job
+     */
+    public static void removeJob(final ScalingJob scalingJob) {
+        SCALING_JOB_MAP.remove(String.format("%d-%d", scalingJob.getJobId(), scalingJob.getShardingItem()));
+    }
+    
+    private static final class PersistJobContextRunnable implements Runnable {
+        
+        @Override
+        public void run() {
+            for (Map.Entry<String, ScalingJob> entry : SCALING_JOB_MAP.entrySet()) {
+                try {
+                    REGISTRY_REPOSITORY_API.persistJobPosition(entry.getValue());
+                    // CHECKSTYLE:OFF
+                } catch (final Exception ex) {
+                    // CHECKSTYLE:ON
+                    log.error("persist job {} context failed.", entry.getKey(), ex);
+                }
+            }
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
similarity index 57%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
index e685d50..5227060 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
@@ -15,31 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.api;
 
-import lombok.Getter;
-import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
 
 /**
- * Job configuration.
+ * Registry repository API.
  */
-@Setter
-@Getter
-public final class JobConfiguration {
-    
-    private Long jobId;
-    
-    private int concurrency = 3;
-    
-    private int retryTimes = 3;
-    
-    private String[] shardingTables;
-    
-    private Integer shardingItem;
-    
-    private int shardingSize = 1000 * 10000;
-    
-    private boolean running = true;
-    
-    private WorkflowConfiguration workflowConfig;
+public interface RegistryRepositoryAPI {
+    
+    /**
+     * persist job position.
+     *
+     * @param scalingJob scaling job
+     */
+    void persistJobPosition(ScalingJob scalingJob);
+    
+    /**
+     * Get job position.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job position
+     */
+    JobPosition getJobPosition(long jobId, int shardingItem);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
new file mode 100644
index 0000000..54c3ce9
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
+import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+
+import java.util.stream.Collectors;
+
+/**
+ * Registry repository API impl.
+ */
+@Slf4j
+public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
+    
+    private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.getInstance();
+    
+    @Override
+    public void persistJobPosition(final ScalingJob scalingJob) {
+        JobPosition jobPosition = new JobPosition();
+        jobPosition.setStatus(scalingJob.getStatus());
+        jobPosition.setDatabaseType(scalingJob.getScalingConfig().getJobConfiguration().getDatabaseType());
+        jobPosition.setIncrementalPositions(scalingJob.getIncrementalTasks().stream().collect(Collectors.toMap(ScalingTask::getTaskId, ScalingTask::getPosition)));
+        jobPosition.setInventoryPositions(scalingJob.getInventoryTasks().stream().collect(Collectors.toMap(ScalingTask::getTaskId, ScalingTask::getPosition)));
+        REGISTRY_REPOSITORY.persist(ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), scalingJob.getShardingItem()), jobPosition.toJson());
+    }
+    
+    @Override
+    public JobPosition getJobPosition(final long jobId, final int shardingItem) {
+        String data = null;
+        try {
+            data = REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, shardingItem));
+        } catch (final NullPointerException ex) {
+            log.info("job {}-{} without break point.", jobId, shardingItem);
+        }
+        return Strings.isNullOrEmpty(data) ? null : JobPosition.fromJson(data);
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
index d6ed7fa..77ad701 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.config;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 import java.util.Map;
 
@@ -35,7 +35,7 @@ public class DumperConfiguration {
     
     private ScalingDataSourceConfiguration dataSourceConfig;
     
-    private PositionManager positionManager;
+    private Position<?> position;
     
     private Map<String, String> tableNameMap;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index e685d50..82057b5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -41,5 +41,7 @@ public final class JobConfiguration {
     
     private boolean running = true;
     
+    private String databaseType;
+    
     private WorkflowConfiguration workflowConfig;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java
index 5b6325f..55756a1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.scaling.core.execute.executor;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 /**
  * Abstract scaling executor.
@@ -33,10 +32,6 @@ public abstract class AbstractScalingExecutor implements ScalingExecutor {
     @Getter(AccessLevel.PROTECTED)
     private boolean running;
     
-    private String taskId;
-    
-    private PositionManager positionManager;
-    
     @Override
     public void start() {
         running = true;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index b2d9ff1..61a8648 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -111,7 +111,7 @@ public abstract class AbstractJDBCDumper extends AbstractScalingExecutor impleme
         if (null == inventoryDumperConfig.getPrimaryKey()) {
             return new PlaceholderPosition();
         }
-        return new PrimaryKeyPosition(rs.getLong(inventoryDumperConfig.getPrimaryKey()), ((PrimaryKeyPosition) inventoryDumperConfig.getPositionManager().getPosition()).getEndValue());
+        return new PrimaryKeyPosition(rs.getLong(inventoryDumperConfig.getPrimaryKey()), ((PrimaryKeyPosition) inventoryDumperConfig.getPosition()).getEndValue());
     }
     
     protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index ce5656ec..ab65d9a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -21,7 +21,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
+import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
 import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
@@ -46,6 +46,8 @@ public final class ScalingJob {
     
     private String databaseType;
     
+    private JobPosition initPosition;
+    
     private final transient List<TaskConfiguration> taskConfigs = new LinkedList<>();
     
     private final transient List<ScalingTask> inventoryTasks = new LinkedList<>();
@@ -56,8 +58,6 @@ public final class ScalingJob {
     
     private String status = JobStatus.RUNNING.name();
     
-    private transient ResumeBreakPointManager resumeBreakPointManager;
-    
     public ScalingJob() {
         this(generateKey());
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/JobPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/JobPosition.java
new file mode 100644
index 0000000..baeb112
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/JobPosition.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Position group.
+ */
+@Getter
+@Setter
+public final class JobPosition {
+    
+    private static final Gson GSON = new Gson();
+    
+    private static final Gson INVENTORY_POSITION_ADAPTED_GSON = new GsonBuilder().registerTypeHierarchyAdapter(Position.class, new InventoryPositionTypeAdapter()).create();
+    
+    private String status;
+    
+    private String databaseType;
+    
+    private Map<String, Position<?>> inventoryPositions;
+    
+    private Map<String, Position<?>> incrementalPositions;
+    
+    /**
+     * Get incremental position.
+     *
+     * @param dataSourceName data source name
+     * @return incremental position
+     */
+    public Position<?> getIncrementalPosition(final String dataSourceName) {
+        return incrementalPositions.get(dataSourceName);
+    }
+    
+    /**
+     * Get inventory position.
+     *
+     * @param tableName table name
+     * @return inventory position
+     */
+    public Map<String, Position<?>> getInventoryPosition(final String tableName) {
+        Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", tableName));
+        return inventoryPositions.entrySet().stream()
+                .filter(entry -> pattern.matcher(entry.getKey()).find())
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+    
+    /**
+     * To json.
+     *
+     * @return json data
+     */
+    public String toJson() {
+        JsonObject result = new JsonObject();
+        result.addProperty("status", status);
+        result.addProperty("databaseType", databaseType);
+        result.add("inventory", getInventoryJson());
+        result.add("incremental", getIncrementalJson());
+        return result.toString();
+    }
+    
+    private JsonObject getInventoryJson() {
+        JsonObject result = new JsonObject();
+        JsonArray finished = new JsonArray();
+        JsonObject unfinished = new JsonObject();
+        for (Map.Entry<String, Position<?>> entry : inventoryPositions.entrySet()) {
+            if (entry.getValue() instanceof FinishedPosition) {
+                finished.add(entry.getKey());
+                continue;
+            }
+            unfinished.add(entry.getKey(), GSON.toJsonTree(entry.getValue(), entry.getValue().getClass()));
+        }
+        result.add("finished", finished);
+        result.add("unfinished", unfinished);
+        return result;
+    }
+    
+    private JsonObject getIncrementalJson() {
+        JsonObject result = new JsonObject();
+        for (Map.Entry<String, Position<?>> entry : incrementalPositions.entrySet()) {
+            result.add(entry.getKey(), GSON.toJsonTree(entry.getValue(), entry.getClass()));
+        }
+        return result;
+    }
+    
+    /**
+     * From json.
+     *
+     * @param data json data
+     * @return job position
+     */
+    public static JobPosition fromJson(final String data) {
+        JobPosition result = new JobPosition();
+        JsonObject jsonObject = GSON.fromJson(data, JsonObject.class);
+        result.setStatus(jsonObject.get("status").getAsString());
+        result.setDatabaseType(jsonObject.get("databaseType").getAsString());
+        result.setInventoryPositions(getInventoryPositions(jsonObject.get("inventory").getAsJsonObject()));
+        result.setIncrementalPositions(getIncrementalPositions(jsonObject.get("incremental").getAsJsonObject(), jsonObject.get("databaseType").getAsString()));
+        return result;
+    }
+    
+    private static Map<String, Position<?>> getInventoryPositions(final JsonObject inventory) {
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.add("inventory", inventory);
+        return INVENTORY_POSITION_ADAPTED_GSON.fromJson(jsonObject, JobPosition.class).getInventoryPositions();
+    }
+    
+    private static Map<String, Position<?>> getIncrementalPositions(final JsonObject incremental, final String databaseType) {
+        Class<?> incrementalPositionClass = PositionInitializerFactory.getPositionClass(databaseType);
+        Map<String, Position<?>> result = Maps.newHashMap();
+        for (String each : incremental.keySet()) {
+            result.put(each, (Position<?>) GSON.fromJson(incremental.get(each), incrementalPositionClass));
+        }
+        return result;
+    }
+    
+    private static class InventoryPositionTypeAdapter extends TypeAdapter<Position<?>> {
+        
+        @Override
+        public void write(final JsonWriter out, final Position<?> value) throws IOException {
+            if (value instanceof PrimaryKeyPosition) {
+                new PrimaryKeyPosition.PositionTypeAdapter().write(out, (PrimaryKeyPosition) value);
+            } else if (value instanceof PlaceholderPosition) {
+                new PlaceholderPosition.PositionTypeAdapter().write(out, (PlaceholderPosition) value);
+            }
+        }
+        
+        @Override
+        public Position<?> read(final JsonReader in) throws IOException {
+            in.beginArray();
+            Position<?> result = in.hasNext() ? new PrimaryKeyPosition(in.nextLong(), in.nextLong()) : new PlaceholderPosition();
+            in.endArray();
+            return result;
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
similarity index 69%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
index 3f2597c..4727d32 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
@@ -17,27 +17,20 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import lombok.Getter;
-import lombok.Setter;
-
 import javax.sql.DataSource;
+import java.sql.SQLException;
 
 /**
- * Scaling position manager.
+ * Position initializer.
  */
-@Getter
-@Setter
-public class PositionManager {
-    
-    private DataSource dataSource;
-    
-    private Position<?> position;
-    
-    public PositionManager(final DataSource dataSource) {
-        this.dataSource = dataSource;
-    }
+public interface PositionInitializer<T extends Position<?>> {
     
-    public PositionManager(final Position<?> position) {
-        this.position = position;
-    }
+    /**
+     * Init position by data source.
+     *
+     * @param dataSource data source
+     * @return position
+     * @throws SQLException SQL exception
+     */
+    T init(DataSource dataSource) throws SQLException;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
similarity index 50%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
index fe0ed84..33717c3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
@@ -17,43 +17,33 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-
-import javax.sql.DataSource;
+import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
 
 /**
- * Position manager factory.
+ * Position initializer factory.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PositionManagerFactory {
+public final class PositionInitializerFactory {
     
     /**
-     * New instance of position manager.
+     * New instance of position initializer.
      *
      * @param databaseType database type
-     * @param dataSource data source
-     * @return position manager
+     * @return position initializer
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static PositionManager newInstance(final String databaseType, final DataSource dataSource) {
-        ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+    public static PositionInitializer<?> newInstance(final String databaseType) {
+        return ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType).getPositionInitializer().newInstance();
     }
     
     /**
-     * New instance of position manager.
+     * Get position type.
      *
      * @param databaseType database type
-     * @param position position
-     * @return position manager
+     * @return position type
      */
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static PositionManager newInstance(final String databaseType, final String position) {
-        ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
+    public static Class<?> getPositionClass(final String databaseType) {
+        return ReflectionUtil.getInterfaceGenericClass(ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType).getPositionInitializer());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
deleted file mode 100644
index 8b71ecb..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
-import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionGroup;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * Abstract resume from break-point manager.
- */
-@Slf4j
-public abstract class AbstractResumeBreakPointManager implements ResumeBreakPointManager, Closeable {
-    
-    private static final Gson GSON = new Gson();
-    
-    @Getter
-    private final Map<String, PositionManager> inventoryPositionManagerMap = Maps.newConcurrentMap();
-    
-    @Getter
-    private final Map<String, PositionManager> incrementalPositionManagerMap = Maps.newConcurrentMap();
-    
-    @Getter
-    private boolean resumable;
-    
-    private final String databaseType;
-    
-    private final String taskPath;
-    
-    private final ScheduledExecutorService executor;
-    
-    public AbstractResumeBreakPointManager(final String databaseType, final String taskPath) {
-        this.databaseType = databaseType;
-        this.taskPath = taskPath;
-        resumePosition();
-        executor = Executors.newSingleThreadScheduledExecutor();
-        executor.scheduleWithFixedDelay(this::persistPosition, 1, 1, TimeUnit.MINUTES);
-    }
-    
-    private void resumePosition() {
-        try {
-            resumeInventoryPosition(getInventoryPath());
-            resumeIncrementalPosition(getIncrementalPath());
-            resumable = !inventoryPositionManagerMap.isEmpty() && !incrementalPositionManagerMap.isEmpty();
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("resume position failed.");
-            throw ex;
-        }
-    }
-    
-    protected void resumeInventoryPosition(final String path) {
-        String data = getPosition(path);
-        if (Strings.isNullOrEmpty(data)) {
-            return;
-        }
-        log.info("resume inventory position from {} = {}", taskPath, data);
-        InventoryPositionGroup inventoryPositionGroup = InventoryPositionGroup.fromJson(data);
-        Map<String, Position<?>> unfinished = inventoryPositionGroup.getUnfinished();
-        for (Entry<String, Position<?>> entry : unfinished.entrySet()) {
-            inventoryPositionManagerMap.put(entry.getKey(), new PositionManager(entry.getValue()));
-        }
-        for (String each : inventoryPositionGroup.getFinished()) {
-            inventoryPositionManagerMap.put(each, new PositionManager(new FinishedPosition()));
-        }
-    }
-    
-    protected void resumeIncrementalPosition(final String path) {
-        String data = getPosition(path);
-        if (Strings.isNullOrEmpty(data)) {
-            return;
-        }
-        log.info("resume incremental position from {} = {}", taskPath, data);
-        Map<String, Object> incrementalPosition = GSON.<Map<String, Object>>fromJson(data, Map.class);
-        for (Entry<String, Object> entry : incrementalPosition.entrySet()) {
-            incrementalPositionManagerMap.put(entry.getKey(), PositionManagerFactory.newInstance(databaseType, entry.getValue().toString()));
-        }
-    }
-    
-    @Override
-    public void persistPosition() {
-        try {
-            persistIncrementalPosition();
-            persistInventoryPosition();
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("persist position failed.", ex);
-        }
-    }
-    
-    private void persistInventoryPosition() {
-        InventoryPositionGroup inventoryPositionGroup = new InventoryPositionGroup();
-        for (Entry<String, PositionManager> entry : inventoryPositionManagerMap.entrySet()) {
-            if (entry.getValue().getPosition() instanceof FinishedPosition) {
-                inventoryPositionGroup.getFinished().add(entry.getKey());
-                continue;
-            }
-            inventoryPositionGroup.getUnfinished().put(entry.getKey(), entry.getValue().getPosition());
-        }
-        String data = inventoryPositionGroup.toJson();
-        log.info("persist inventory position {} = {}", getInventoryPath(), data);
-        persistPosition(getInventoryPath(), data);
-    }
-    
-    private void persistIncrementalPosition() {
-        String data = GSON.toJson(incrementalPositionManagerMap.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition())));
-        log.info("persist incremental position {} = {}", getIncrementalPath(), data);
-        persistPosition(getIncrementalPath(), data);
-    }
-    
-    protected String getInventoryPath() {
-        return String.format("%s/%s", taskPath, ScalingConstant.INVENTORY);
-    }
-    
-    protected String getIncrementalPath() {
-        return String.format("%s/%s", taskPath, ScalingConstant.INCREMENTAL);
-    }
-    
-    @Override
-    public void close() {
-        executor.submit((Runnable) this::persistPosition);
-        executor.shutdown();
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
deleted file mode 100644
index 284cbe2..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * File system resume from break-point manager as default.
- */
-public final class FileSystemResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-    
-    public FileSystemResumeBreakPointManager(final String databaseType, final String taskPath) {
-        super(databaseType, taskPath.startsWith("/") ? ".scaling" + taskPath : taskPath);
-    }
-    
-    @Override
-    @SneakyThrows(IOException.class)
-    public String getPosition(final String path) {
-        File file = new File(path);
-        if (!file.exists()) {
-            return null;
-        }
-        return FileUtils.readFileToString(file, StandardCharsets.UTF_8);
-    }
-    
-    @Override
-    @SneakyThrows(IOException.class)
-    public void persistPosition(final String path, final String data) {
-        FileUtils.writeStringToFile(new File(path), data, StandardCharsets.UTF_8);
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManager.java
deleted file mode 100644
index 91fedb6..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManager.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-
-/**
- * Registry repository resume from break-point manager.
- */
-public final class RegistryRepositoryResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-    
-    private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.getInstance();
-    
-    public RegistryRepositoryResumeBreakPointManager(final String databaseType, final String taskPath) {
-        super(databaseType, taskPath);
-    }
-    
-    @Override
-    public String getPosition(final String path) {
-        return REGISTRY_REPOSITORY.get(path);
-    }
-    
-    @Override
-    public void persistPosition(final String path, final String data) {
-        REGISTRY_REPOSITORY.persist(path, data);
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
deleted file mode 100644
index d52f78e..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-
-import java.util.Map;
-
-/**
- * Resume from break-point manager interface.
- */
-public interface ResumeBreakPointManager {
-    
-    /**
-     * If has resumable data.
-     *
-     * @return is resumable
-     */
-    boolean isResumable();
-    
-    /**
-     * Get inventory position map.
-     *
-     * @return inventory position map
-     */
-    Map<String, PositionManager> getInventoryPositionManagerMap();
-    
-    /**
-     * Get incremental position map.
-     *
-     * @return incremental position map
-     */
-    Map<String, PositionManager> getIncrementalPositionManagerMap();
-    
-    /**
-     * Get position.
-     *
-     * @param path path
-     * @return data
-     */
-    String getPosition(String path);
-    
-    /**
-     * Persist position immediately.
-     */
-    void persistPosition();
-    
-    /**
-     * Persist position.
-     *
-     * @param path path
-     * @param data data
-     */
-    void persistPosition(String path, String data);
-    
-    /**
-     * Close this manager.
-     */
-    void close();
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
deleted file mode 100644
index efcd723..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-
-/**
- * Resume from break-point manager factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ResumeBreakPointManagerFactory {
-    
-    private static Class<? extends ResumeBreakPointManager> clazz = FileSystemResumeBreakPointManager.class;
-    
-    static {
-        ShardingSphereServiceLoader.register(RegistryRepository.class);
-        ShardingSphereServiceLoader.register(ConfigurationRepository.class);
-        if (RegistryRepositoryHolder.isAvailable()) {
-            clazz = RegistryRepositoryResumeBreakPointManager.class;
-        }
-    }
-    
-    /**
-     * New resume from break-point manager instance.
-     *
-     * @param databaseType database type
-     * @param taskPath task path for persist data.
-     * @return resume from break-point manager
-     */
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static ResumeBreakPointManager newInstance(final String databaseType, final String taskPath) {
-        return clazz.getConstructor(String.class, String.class).newInstance(databaseType, taskPath);
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index f8a369f..2013895 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -19,24 +19,21 @@ package org.apache.shardingsphere.scaling.core.job.preparer;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceCheckerFactory;
-import org.apache.shardingsphere.scaling.core.job.preparer.resumer.ScalingPositionResumer;
 import org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+import org.apache.shardingsphere.scaling.core.utils.ScalingConfigurationUtil;
 
+import java.sql.SQLException;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -50,35 +47,26 @@ public final class ScalingJobPreparer {
     
     private final InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter();
     
-    private final ScalingPositionResumer scalingPositionResumer = new ScalingPositionResumer();
-    
     /**
      * Do prepare work for scaling job.
      *
      * @param scalingJob scaling job
      */
     public void prepare(final ScalingJob scalingJob) {
+        ScalingConfigurationUtil.fillInProperties(scalingJob.getScalingConfig());
         try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
-            checkSourceDataSources(scalingJob, dataSourceManager);
-            ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(scalingJob);
-            scalingJob.setResumeBreakPointManager(resumeBreakPointManager);
-            if (resumeBreakPointManager.isResumable()) {
-                scalingPositionResumer.resumePosition(scalingJob, dataSourceManager, resumeBreakPointManager);
-            } else {
-                checkTargetDataSources(scalingJob, dataSourceManager);
-                initIncrementalTasks(scalingJob, dataSourceManager);
-                initInventoryTasks(scalingJob, dataSourceManager);
-                scalingPositionResumer.persistPosition(scalingJob, resumeBreakPointManager);
-            }
-        } catch (final PrepareFailedException ex) {
+            checkDataSource(scalingJob, dataSourceManager);
+            initIncrementalTasks(scalingJob, dataSourceManager);
+            initInventoryTasks(scalingJob, dataSourceManager);
+        } catch (final PrepareFailedException | SQLException ex) {
             log.error("Preparing scaling job {} failed", scalingJob.getJobId(), ex);
             scalingJob.setStatus(JobStatus.PREPARING_FAILURE.name());
         }
     }
     
-    private ResumeBreakPointManager getResumeBreakPointManager(final ScalingJob scalingJob) {
-        return ResumeBreakPointManagerFactory.newInstance(scalingJob.getDatabaseType(),
-                ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), ScalingConstant.POSITION, scalingJob.getShardingItem()));
+    private void checkDataSource(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+        checkSourceDataSources(scalingJob, dataSourceManager);
+        checkTargetDataSources(scalingJob, dataSourceManager);
     }
     
     private void checkSourceDataSources(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
@@ -96,16 +84,22 @@ public final class ScalingJobPreparer {
     private void initInventoryTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
         List<ScalingTask> allInventoryTasks = new LinkedList<>();
         for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
-            allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(scalingJob.getDatabaseType(), each, dataSourceManager));
+            allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(scalingJob, each, dataSourceManager));
         }
         scalingJob.getInventoryTasks().addAll(allInventoryTasks);
     }
     
-    private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+    private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) throws SQLException {
         for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
-            ScalingDataSourceConfiguration dataSourceConfig = each.getDumperConfig().getDataSourceConfig();
-            each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(scalingJob.getDatabaseType(), dataSourceManager.getDataSource(dataSourceConfig)));
+            each.getDumperConfig().setPosition(getIncrementalPosition(scalingJob, each, dataSourceManager));
             scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
         }
     }
+    
+    private Position<?> getIncrementalPosition(final ScalingJob scalingJob, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) throws SQLException {
+        if (null != scalingJob.getInitPosition()) {
+            return scalingJob.getInitPosition().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
+        }
+        return PositionInitializerFactory.newInstance(taskConfig.getJobConfig().getDatabaseType()).init(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java
index fb59dce..2523a1d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java
@@ -58,7 +58,7 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
             try (PreparedStatement preparedStatement = dataSource.getConnection().prepareStatement(getSqlBuilder().buildCheckEmptySQL(each));
                  ResultSet resultSet = preparedStatement.executeQuery()) {
                 if (resultSet.next()) {
-                    throw new PrepareFailedException(String.format("Target table [%s] not empty!", each));
+                    throw new PrepareFailedException(String.format("Target table [%s] is not empty!", each));
                 }
             }
         }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumer.java
deleted file mode 100644
index ef9cb5a..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
-
-import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
-import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
-import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-/**
- * Scaling position resumer.
- */
-public final class ScalingPositionResumer {
-    
-    private final ScalingTaskFactory scalingTaskFactory = new DefaultScalingTaskFactory();
-    
-    /**
-     * Resume position from resume from break-point manager.
-     *
-     * @param scalingJob scaling job
-     * @param dataSourceManager dataSource manager
-     * @param resumeBreakPointManager resume from break-point manager
-     */
-    public void resumePosition(final ScalingJob scalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
-        resumeInventoryPosition(scalingJob, dataSourceManager, resumeBreakPointManager);
-        resumeIncrementalPosition(scalingJob, resumeBreakPointManager);
-    }
-    
-    private void resumeInventoryPosition(final ScalingJob scalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
-        scalingJob.getInventoryTasks().addAll(getAllInventoryTasks(scalingJob, dataSourceManager, resumeBreakPointManager));
-    }
-    
-    private List<ScalingTask> getAllInventoryTasks(final ScalingJob scalingJob,
-                                                                          final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
-        List<ScalingTask> result = new LinkedList<>();
-        for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
-            MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfig().getDataSourceConfig()));
-            for (Entry<String, PositionManager> entry : getInventoryPositionMap(each.getDumperConfig(), resumeBreakPointManager).entrySet()) {
-                result.add(scalingTaskFactory.createInventoryTask(newInventoryDumperConfig(each.getDumperConfig(), metaDataManager, entry), each.getImporterConfig()));
-            }
-        }
-        return result;
-    }
-    
-    private InventoryDumperConfiguration newInventoryDumperConfig(final DumperConfiguration dumperConfig, final MetaDataManager metaDataManager, final Entry<String, PositionManager> entry) {
-        String[] splitTable = entry.getKey().split("#");
-        InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
-        result.setTableName(splitTable[0].split("\\.")[1]);
-        result.setPositionManager(entry.getValue());
-        if (2 == splitTable.length) {
-            result.setShardingItem(Integer.parseInt(splitTable[1]));
-        }
-        result.setPrimaryKey(metaDataManager.getTableMetaData(result.getTableName()).getPrimaryKeyColumns().get(0));
-        return result;
-    }
-    
-    private Map<String, PositionManager> getInventoryPositionMap(final DumperConfiguration dumperConfig, final ResumeBreakPointManager resumeBreakPointManager) {
-        Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfig.getDataSourceName()));
-        return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
-                .filter(entry -> pattern.matcher(entry.getKey()).find())
-                .collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
-    }
-
-    private void resumeIncrementalPosition(final ScalingJob scalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
-        for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
-            each.getDumperConfig().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfig().getDataSourceName()));
-            scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
-        }
-    }
-    
-    /**
-     * Persist position.
-     *
-     * @param scalingJob scaling job
-     * @param resumeBreakPointManager resume from break-point manager
-     */
-    public void persistPosition(final ScalingJob scalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
-        for (ScalingTask each : scalingJob.getInventoryTasks()) {
-            resumeBreakPointManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
-        }
-        for (ScalingTask each : scalingJob.getIncrementalTasks()) {
-            resumeBreakPointManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
-        }
-        resumeBreakPointManager.persistPosition();
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
index c4df3f9..a948dd5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
 
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
@@ -24,10 +25,10 @@ import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguratio
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilderFactory;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
@@ -40,6 +41,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,30 +57,26 @@ public final class InventoryTaskSplitter {
     /**
      * Split inventory data to multi-tasks.
      *
-     * @param databaseType database type
+     * @param scalingJob scaling job
      * @param taskConfig task configuration
      * @param dataSourceManager data source manager
      * @return split inventory data task
      */
-    public Collection<ScalingTask> splitInventoryData(final String databaseType, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) {
+    public Collection<ScalingTask> splitInventoryData(final ScalingJob scalingJob, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) {
         Collection<ScalingTask> result = new LinkedList<>();
-        for (InventoryDumperConfiguration each : splitDumperConfig(databaseType, taskConfig.getJobConfig().getShardingSize(), taskConfig.getDumperConfig(), dataSourceManager)) {
+        for (InventoryDumperConfiguration each : splitDumperConfig(scalingJob, taskConfig.getDumperConfig(), dataSourceManager)) {
             result.add(scalingTaskFactory.createInventoryTask(each, taskConfig.getImporterConfig()));
         }
         return result;
     }
     
     private Collection<InventoryDumperConfiguration> splitDumperConfig(
-            final String databaseType, final int shardingSize, final DumperConfiguration dumperConfig, final DataSourceManager dataSourceManager) {
+            final ScalingJob scalingJob, final DumperConfiguration dumperConfig, final DataSourceManager dataSourceManager) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         MetaDataManager metaDataManager = new MetaDataManager(dataSource);
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
-            if (isSpiltByPrimaryKeyRange(each, metaDataManager)) {
-                result.addAll(splitByPrimaryKeyRange(databaseType, shardingSize, each, metaDataManager, dataSource));
-            } else {
-                result.add(each);
-            }
+            result.addAll(splitByPrimaryKey(scalingJob, dataSource, metaDataManager, each));
         }
         return result;
     }
@@ -88,30 +86,59 @@ public final class InventoryTaskSplitter {
         dumperConfig.getTableNameMap().forEach((key, value) -> {
             InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
             inventoryDumperConfig.setTableName(key);
-            inventoryDumperConfig.setPositionManager(new PositionManager(new PlaceholderPosition()));
+            inventoryDumperConfig.setPosition(new PlaceholderPosition());
             result.add(inventoryDumperConfig);
         });
         return result;
     }
     
-    private boolean isSpiltByPrimaryKeyRange(final InventoryDumperConfiguration inventoryDumperConfig, final MetaDataManager metaDataManager) {
-        TableMetaData tableMetaData = metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName());
+    private Collection<InventoryDumperConfiguration> splitByPrimaryKey(
+            final ScalingJob scalingJob, final DataSource dataSource, final MetaDataManager metaDataManager, final InventoryDumperConfiguration dumperConfig) {
+        Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+        Collection<Position<?>> inventoryPositions = getInventoryPositions(scalingJob, dumperConfig, dataSource, metaDataManager);
+        int i = 0;
+        for (Position<?> inventoryPosition : inventoryPositions) {
+            InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
+            splitDumperConfig.setPosition(inventoryPosition);
+            splitDumperConfig.setShardingItem(i++);
+            splitDumperConfig.setTableName(dumperConfig.getTableName());
+            splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
+            result.add(splitDumperConfig);
+        }
+        return result;
+    }
+    
+    private Collection<Position<?>> getInventoryPositions(
+            final ScalingJob scalingJob, final InventoryDumperConfiguration dumperConfig, final DataSource dataSource, final MetaDataManager metaDataManager) {
+        if (null != scalingJob.getInitPosition()) {
+            return scalingJob.getInitPosition().getInventoryPosition(dumperConfig.getTableName()).values();
+        }
+        if (isSpiltByPrimaryKeyRange(metaDataManager, dumperConfig.getTableName())) {
+            String primaryKey = metaDataManager.getTableMetaData(dumperConfig.getTableName()).getPrimaryKeyColumns().get(0);
+            dumperConfig.setPrimaryKey(primaryKey);
+            return getPositionByPrimaryKeyRange(scalingJob, dataSource, dumperConfig);
+        }
+        return Lists.newArrayList(new PlaceholderPosition());
+    }
+    
+    private boolean isSpiltByPrimaryKeyRange(final MetaDataManager metaDataManager, final String tableName) {
+        TableMetaData tableMetaData = metaDataManager.getTableMetaData(tableName);
         if (null == tableMetaData) {
-            log.warn("Can't split range for table {}, reason: can not get table metadata ", inventoryDumperConfig.getTableName());
+            log.warn("Can't split range for table {}, reason: can not get table metadata ", tableName);
             return false;
         }
         List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
         if (null == primaryKeys || primaryKeys.isEmpty()) {
-            log.warn("Can't split range for table {}, reason: no primary key", inventoryDumperConfig.getTableName());
+            log.warn("Can't split range for table {}, reason: no primary key", tableName);
             return false;
         }
         if (primaryKeys.size() > 1) {
-            log.warn("Can't split range for table {}, reason: primary key is union primary", inventoryDumperConfig.getTableName());
+            log.warn("Can't split range for table {}, reason: primary key is union primary", tableName);
             return false;
         }
         int index = tableMetaData.findColumnIndex(primaryKeys.get(0));
         if (isNotIntegerPrimary(tableMetaData.getColumnMetaData(index).getDataType())) {
-            log.warn("Can't split range for table {}, reason: primary key is not integer number", inventoryDumperConfig.getTableName());
+            log.warn("Can't split range for table {}, reason: primary key is not integer number", tableName);
             return false;
         }
         return true;
@@ -121,36 +148,28 @@ public final class InventoryTaskSplitter {
         return Types.INTEGER != columnType && Types.BIGINT != columnType && Types.SMALLINT != columnType && Types.TINYINT != columnType;
     }
     
-    private Collection<InventoryDumperConfiguration> splitByPrimaryKeyRange(
-            final String databaseType, final int shardingSize, final InventoryDumperConfiguration inventoryDumperConfig, final MetaDataManager metaDataManager, final DataSource dataSource) {
-        Collection<InventoryDumperConfiguration> result = new LinkedList<>();
-        String tableName = inventoryDumperConfig.getTableName();
-        String primaryKey = metaDataManager.getTableMetaData(tableName).getPrimaryKeyColumns().get(0);
-        ScalingSQLBuilder scalingSqlBuilder = ScalingSQLBuilderFactory.newInstance(databaseType);
-        inventoryDumperConfig.setPrimaryKey(primaryKey);
+    private Collection<Position<?>> getPositionByPrimaryKeyRange(final ScalingJob scalingJob, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
+        Collection<Position<?>> result = new ArrayList<>();
+        String sql = ScalingSQLBuilderFactory.newInstance(scalingJob.getScalingConfig().getJobConfiguration().getDatabaseType())
+                .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
         try (Connection connection = dataSource.getConnection();
-             PreparedStatement ps = connection.prepareStatement(scalingSqlBuilder.buildSplitByPrimaryKeyRangeSQL(tableName, primaryKey))) {
+             PreparedStatement ps = connection.prepareStatement(sql)) {
             long beginId = 0;
             for (int i = 0; i < Integer.MAX_VALUE; i++) {
                 ps.setLong(1, beginId);
-                ps.setLong(2, shardingSize);
+                ps.setLong(2, scalingJob.getScalingConfig().getJobConfiguration().getShardingSize());
                 try (ResultSet rs = ps.executeQuery()) {
                     rs.next();
                     long endId = rs.getLong(1);
                     if (endId == 0) {
                         break;
                     }
-                    InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(inventoryDumperConfig);
-                    splitDumperConfig.setPositionManager(new PositionManager(new PrimaryKeyPosition(beginId, endId)));
-                    splitDumperConfig.setShardingItem(i);
-                    splitDumperConfig.setPrimaryKey(primaryKey);
-                    splitDumperConfig.setTableName(tableName);
-                    result.add(splitDumperConfig);
+                    result.add(new PrimaryKeyPosition(beginId, endId));
                     beginId = endId + 1;
                 }
             }
         } catch (final SQLException ex) {
-            throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", inventoryDumperConfig.getTableName(), primaryKey), ex);
+            throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getTableName(), dumperConfig.getPrimaryKey()), ex);
         }
         return result;
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
index 68344f3..426889d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.task;
 
 import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 /**
  * Scaling task interface.
@@ -34,11 +34,11 @@ public interface ScalingTask extends ScalingExecutor {
     TaskProgress getProgress();
     
     /**
-     * Get position manager.
+     * Get position.
      *
-     * @return position manager
+     * @return position
      */
-    PositionManager getPositionManager();
+    Position<?> getPosition();
     
     /**
      * Get task id.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
index a5aa5de..788c0bc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.job.task.incremental;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
@@ -33,6 +34,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.job.TaskProgress;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.ArrayList;
@@ -57,6 +59,12 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
     
     private Dumper dumper;
     
+    @Getter
+    private final String taskId;
+    
+    @Getter
+    private Position<?> position;
+    
     private long delayMillisecond = Long.MAX_VALUE;
     
     public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig) {
@@ -64,13 +72,13 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
         this.dumperConfig = dumperConfig;
         this.importerConfig = importerConfig;
         dataSourceManager = new DataSourceManager();
-        setTaskId(dumperConfig.getDataSourceName());
-        setPositionManager(dumperConfig.getPositionManager());
+        taskId = dumperConfig.getDataSourceName();
+        position = dumperConfig.getPosition();
     }
     
     @Override
     public void start() {
-        dumper = DumperFactory.newInstanceLogDumper(dumperConfig, getPositionManager().getPosition());
+        dumper = DumperFactory.newInstanceLogDumper(dumperConfig, position);
         Collection<Importer> importers = instanceImporters();
         instanceChannel(importers);
         Future<?> future = ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submitAll(importers, new ExecuteCallback() {
@@ -102,7 +110,7 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
         DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
             if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
-                getPositionManager().setPosition(lastHandledRecord.getPosition());
+                position = lastHandledRecord.getPosition();
             }
             delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
         });
@@ -117,7 +125,7 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
             future.get();
         } catch (final InterruptedException ignored) {
         } catch (final ExecutionException ex) {
-            throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), ex.getCause());
+            throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", taskId), ex.getCause());
         }
     }
     
@@ -131,6 +139,6 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
     
     @Override
     public TaskProgress getProgress() {
-        return new IncrementalTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getPosition());
+        return new IncrementalTaskProgress(taskId, delayMillisecond, position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java
index 1ecc14e..1c88d19 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.job.task.inventory;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
@@ -34,6 +35,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.job.TaskProgress;
 import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.Optional;
@@ -54,6 +56,12 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
     
     private Dumper dumper;
     
+    @Getter
+    private final String taskId;
+    
+    @Getter
+    private Position<?> position;
+    
     public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig) {
         this(inventoryDumperConfig, importerConfig, new DataSourceManager());
     }
@@ -62,8 +70,8 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
         this.inventoryDumperConfig = inventoryDumperConfig;
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
-        setTaskId(generateTaskId(inventoryDumperConfig));
-        setPositionManager(inventoryDumperConfig.getPositionManager());
+        taskId = generateTaskId(inventoryDumperConfig);
+        position = inventoryDumperConfig.getPosition();
     }
     
     private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
@@ -100,7 +108,7 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
     private void instanceChannel(final Importer importer) {
         MemoryChannel channel = new MemoryChannel(records -> {
             Optional<Record> record = records.stream().filter(each -> !(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
-            record.ifPresent(value -> getPositionManager().setPosition(value.getPosition()));
+            record.ifPresent(value -> position = value.getPosition());
         });
         dumper.setChannel(channel);
         importer.setChannel(channel);
@@ -111,7 +119,7 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
             future.get();
         } catch (final InterruptedException ignored) {
         } catch (final ExecutionException ex) {
-            throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), ex.getCause());
+            throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", taskId), ex.getCause());
         }
     }
     
@@ -125,6 +133,6 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
     
     @Override
     public TaskProgress getProgress() {
-        return new InventoryTaskProgress(getTaskId(), getPositionManager().getPosition() instanceof FinishedPosition);
+        return new InventoryTaskProgress(taskId, position instanceof FinishedPosition);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 125717d..f16b71f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -62,7 +62,6 @@ public final class ScalingTaskScheduler implements Runnable {
             log.info("stop incremental task {} - {}", scalingJob.getJobId(), each.getTaskId());
             each.stop();
         }
-        scalingJob.getResumeBreakPointManager().close();
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 247895c..32658fe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 
 /**
@@ -46,11 +46,11 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
     Class<? extends LogDumper> getLogDumperClass();
     
     /**
-     * Get position manager type.
+     * Get position initializer type.
      *
-     * @return position manager type
+     * @return position initializer type
      */
-    Class<? extends PositionManager> getPositionManager();
+    Class<? extends PositionInitializer> getPositionInitializer();
     
     /**
      * Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
index 3202cd5..160f686 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.utils;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 
 /**
@@ -36,14 +36,14 @@ public final class RdbmsConfigurationUtil {
      * @return SQL where condition
      */
     public static String getWhereCondition(final InventoryDumperConfiguration inventoryDumperConfig) {
-        return getWhereCondition(inventoryDumperConfig.getPrimaryKey(), inventoryDumperConfig.getPositionManager());
+        return getWhereCondition(inventoryDumperConfig.getPrimaryKey(), inventoryDumperConfig.getPosition());
     }
     
-    private static String getWhereCondition(final String primaryKey, final PositionManager positionManager) {
-        if (null == primaryKey || null == positionManager) {
+    private static String getWhereCondition(final String primaryKey, final Position<?> position) {
+        if (null == primaryKey || null == position) {
             return "";
         }
-        PrimaryKeyPosition position = (PrimaryKeyPosition) positionManager.getPosition();
-        return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, position.getBeginValue(), position.getEndValue());
+        PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) position;
+        return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, primaryKeyPosition.getBeginValue(), primaryKeyPosition.getEndValue());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
index 078f0a9..5c9b954 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
@@ -17,9 +17,13 @@
 
 package org.apache.shardingsphere.scaling.core.utils;
 
+import com.google.common.base.Preconditions;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -153,4 +157,16 @@ public final class ReflectionUtil {
         method.setAccessible(true);
         method.invoke(target, parameterValues);
     }
+    
+    /**
+     * Get interface generic class.
+     *
+     * @param clazz class
+     * @return generic class
+     */
+    public static Class<?> getInterfaceGenericClass(final Class<?> clazz) {
+        Type[] types = clazz.getGenericInterfaces();
+        Preconditions.checkState(types.length == 1, "Only supported one generic type");
+        return (Class<?>) ((ParameterizedType) types[0]).getActualTypeArguments()[0];
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java
new file mode 100644
index 0000000..2eff56d
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
+import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Scaling configuration util.
+ */
+public final class ScalingConfigurationUtil {
+    
+    private static final SnowflakeKeyGenerateAlgorithm ID_AUTO_INCREASE_GENERATOR = initIdAutoIncreaseGenerator();
+    
+    private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
+        SnowflakeKeyGenerateAlgorithm result = new SnowflakeKeyGenerateAlgorithm();
+        result.init();
+        return result;
+    }
+    
+    private static Long generateKey() {
+        return (Long) ID_AUTO_INCREASE_GENERATOR.generateKey();
+    }
+    
+    /**
+     * Fill in properties for scaling config.
+     *
+     * @param scalingConfig scaling config
+     */
+    public static void fillInProperties(final ScalingConfiguration scalingConfig) {
+        JobConfiguration jobConfig = scalingConfig.getJobConfiguration();
+        if (null == jobConfig.getJobId()) {
+            jobConfig.setJobId(generateKey());
+        }
+        if (Strings.isNullOrEmpty(jobConfig.getDatabaseType())) {
+            jobConfig.setDatabaseType(scalingConfig.getRuleConfiguration().getSource().unwrap().getDatabaseType().getName());
+        }
+        if (null == scalingConfig.getJobConfiguration().getShardingTables()) {
+            jobConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(scalingConfig)));
+        }
+    }
+    
+    private static List<String> getShouldScalingActualDataNodes(final ScalingConfiguration scalingConfig) {
+        ScalingDataSourceConfiguration sourceConfig = scalingConfig.getRuleConfiguration().getSource().unwrap();
+        Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
+                "Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
+        ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
+        if (!(scalingConfig.getRuleConfiguration().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
+            return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
+        }
+        ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+        return getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSource(), target.getDataSource()),
+                getShardingRuleConfigMap(source.getRule()), getShardingRuleConfigMap(target.getRule()));
+    }
+    
+    private static List<String> getShouldScalingActualDataNodes(final Set<String> modifiedDataSources,
+                                                                final Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
+                                                                final Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
+        List<String> result = new ArrayList<>();
+        newShardingRuleConfigMap.keySet().forEach(each -> {
+            if (!oldShardingRuleConfigMap.containsKey(each)) {
+                return;
+            }
+            List<String> oldActualDataNodes = new InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
+            List<String> newActualDataNodes = new InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
+            if (!CollectionUtils.isEqualCollection(oldActualDataNodes, newActualDataNodes) || includeModifiedDataSources(newActualDataNodes, modifiedDataSources)) {
+                result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
+            }
+        });
+        return result;
+    }
+    
+    private static Set<String> getModifiedDataSources(final String oldConfig, final String newConfig) {
+        Set<String> result = new HashSet<>();
+        Map<String, String> oldDataSourceUrlMap = getDataSourceUrlMap(oldConfig);
+        Map<String, String> newDataSourceUrlMap = getDataSourceUrlMap(newConfig);
+        newDataSourceUrlMap.forEach((key, value) -> {
+            if (!value.equals(oldDataSourceUrlMap.get(key))) {
+                result.add(key);
+            }
+        });
+        return result;
+    }
+    
+    private static Map<String, String> getDataSourceUrlMap(final String configuration) {
+        Map<String, String> result = new HashMap<>();
+        ConfigurationYamlConverter.loadDataSourceConfigs(configuration).forEach((key, value) -> {
+            JdbcUri uri = new JdbcUri(value.getProps().getOrDefault("url", value.getProps().get("jdbcUrl")).toString());
+            result.put(key, String.format("%s/%s", uri.getHost(), uri.getDatabase()));
+        });
+        return result;
+    }
+    
+    private static boolean includeModifiedDataSources(final List<String> actualDataNodes, final Set<String> modifiedDataSources) {
+        return actualDataNodes.stream().anyMatch(each -> modifiedDataSources.contains(each.split("\\.")[0]));
+    }
+    
+    private static Map<String, ShardingTableRuleConfiguration> getShardingRuleConfigMap(final String configuration) {
+        ShardingRuleConfiguration oldShardingRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(configuration);
+        return oldShardingRuleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable, Function.identity()));
+    }
+    
+    private static String[] groupByDataSource(final List<String> actualDataNodeList) {
+        List<String> result = new ArrayList<>();
+        Multimap<String, String> multiMap = getNodeMultiMap(actualDataNodeList);
+        for (String key : multiMap.keySet()) {
+            List<String> list = new ArrayList<>();
+            for (String value : multiMap.get(key)) {
+                list.add(String.format("%s.%s", key, value));
+            }
+            result.add(String.join(",", list));
+        }
+        return result.toArray(new String[0]);
+    }
+    
+    private static Multimap<String, String> getNodeMultiMap(final List<String> actualDataNodeList) {
+        Multimap<String, String> result = HashMultimap.create();
+        for (String actualDataNodes : actualDataNodeList) {
+            for (String actualDataNode : actualDataNodes.split(",")) {
+                String[] nodeArray = split(actualDataNode);
+                for (String dataSource : new InlineExpressionParser(nodeArray[0]).splitAndEvaluate()) {
+                    result.put(dataSource, nodeArray[1]);
+                }
+            }
+        }
+        return result;
+    }
+    
+    private static String[] split(final String actualDataNode) {
+        boolean flag = true;
+        int i = 0;
+        for (; i < actualDataNode.length(); i++) {
+            char each = actualDataNode.charAt(i);
+            if (each == '{') {
+                flag = false;
+            } else if (each == '}') {
+                flag = true;
+            } else if (flag && each == '.') {
+                break;
+            }
+        }
+        return new String[]{actualDataNode.substring(0, i), actualDataNode.substring(i + 1)};
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index b15dec6..5597050 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -38,7 +38,7 @@ public final class ScalingTaskUtil {
      * @return is finished
      */
     public static boolean allInventoryTasksFinished(final Collection<ScalingTask> inventoryTasks) {
-        return inventoryTasks.stream().allMatch(each -> ((InventoryTask) each).getPositionManager().getPosition() instanceof FinishedPosition);
+        return inventoryTasks.stream().allMatch(each -> ((InventoryTask) each).getPosition() instanceof FinishedPosition);
     }
     
     /**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index f834cbc..2effedc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
@@ -39,8 +39,8 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
-        return FixturePositionManager.class;
+    public Class<? extends PositionInitializer> getPositionInitializer() {
+        return FixturePositionInitializer.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
similarity index 77%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
index 93421b4..ac58683 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
@@ -18,17 +18,14 @@
 package org.apache.shardingsphere.scaling.core.fixture;
 
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 
 import javax.sql.DataSource;
 
-public final class FixturePositionManager extends PositionManager {
+public final class FixturePositionInitializer implements PositionInitializer<PlaceholderPosition> {
     
-    public FixturePositionManager(final DataSource dataSource) {
-        super(new PlaceholderPosition());
-    }
-    
-    public FixturePositionManager(final String position) {
-        super(new PlaceholderPosition());
+    @Override
+    public PlaceholderPosition init(final DataSource dataSource) {
+        return new PlaceholderPosition();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureResumeBreakPointManager.java
deleted file mode 100644
index b808073..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureResumeBreakPointManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.fixture;
-
-import org.apache.shardingsphere.scaling.core.job.position.resume.AbstractResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-
-public final class FixtureResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-    
-    public FixtureResumeBreakPointManager(final String databaseType, final String taskPath) {
-        super(databaseType, taskPath);
-    }
-    
-    @Override
-    public String getPosition(final String path) {
-        return null;
-    }
-    
-    @Override
-    public void persistPosition(final String path, final String data) {
-    
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
deleted file mode 100644
index e06bbbb..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class AbstractResumeBreakPointManagerTest {
-    
-    private AbstractResumeBreakPointManager resumeBreakPointManager;
-    
-    @Test
-    public void assertResumeInventoryPosition() {
-        Map<String, String> dataMap = new HashMap<>();
-        dataMap.put("/base/inventory", "{\"unfinished\":{\"ds1.t_order_1#0\":[0,200],\"ds0.t_order_1#0\":[0,100],\"ds0.t_order_2\":[]},\"finished\":[\"ds0.t_order_1#1\"]}");
-        resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
-        resumeBreakPointManager.resumeInventoryPosition("");
-        assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), is(0));
-        resumeBreakPointManager.resumeInventoryPosition("/base/inventory");
-        assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), is(4));
-    }
-    
-    @Test
-    public void assertResumeIncrementalPosition() {
-        Map<String, String> dataMap = new HashMap<>();
-        dataMap.put("/base/incremental", "{\"ds0\":[],\"ds1\":[]}");
-        resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
-        resumeBreakPointManager.resumeIncrementalPosition("");
-        assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), is(0));
-        resumeBreakPointManager.resumeIncrementalPosition("/base/incremental");
-        assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), is(2));
-    }
-    
-    @Test
-    public void assertPersistInventoryPosition() {
-        Map<String, String> dataMap = new HashMap<>();
-        dataMap.put("/base/inventory", "{\"unfinished\":{\"ds0.t_order_2#0\":[],\"ds0.t_order_1#0\":[0,100]},\"finished\":[\"ds0.t_order_1#1\"]}");
-        dataMap.put("/base/incremental", "{}");
-        resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PositionManager(new PrimaryKeyPosition(0L, 100L)));
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PositionManager(new FinishedPosition()));
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2#0", new PositionManager(new PlaceholderPosition()));
-        resumeBreakPointManager.persistPosition();
-    }
-    
-    @Test
-    public void assertPersistIncrementalPosition() {
-        Map<String, String> dataMap = new HashMap<>();
-        dataMap.put("/base/inventory", "{\"unfinished\":{},\"finished\":[]}");
-        dataMap.put("/base/incremental", "{\"ds0\":[],\"ds1\":[]}");
-        resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new PositionManager(new PlaceholderPosition()));
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new PositionManager(new PlaceholderPosition()));
-        resumeBreakPointManager.persistPosition();
-    }
-    
-    @SneakyThrows(ReflectiveOperationException.class)
-    private AbstractResumeBreakPointManager mockResumeBreakPointManager(final Map<String, String> dataMap) {
-        AbstractResumeBreakPointManager result = new AbstractResumeBreakPointManager("H2", "/base") {
-            
-            @Override
-            public String getPosition(final String path) {
-                return dataMap.get(path);
-            }
-            
-            @Override
-            public void persistPosition(final String path, final String data) {
-                assertThat(data, is(dataMap.get(path)));
-            }
-        };
-        ReflectionUtil.setFieldValue(AbstractResumeBreakPointManager.class, result, "inventoryPositionManagerMap", new TreeMap<String, PositionManager>());
-        ReflectionUtil.setFieldValue(AbstractResumeBreakPointManager.class, result, "incrementalPositionManagerMap", new TreeMap<String, PositionManager>());
-        return result;
-    }
-    
-    @After
-    public void tearDown() {
-        resumeBreakPointManager.close();
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManagerTest.java
deleted file mode 100644
index 1a29ee9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class FileSystemResumeBreakPointManagerTest {
-    
-    private FileSystemResumeBreakPointManager resumeBreakPointManager;
-    
-    @Before
-    public void setUp() {
-        resumeBreakPointManager = new FileSystemResumeBreakPointManager("H2", "target/.scaling");
-    }
-    
-    @Test
-    public void assertPersistAndGetPosition() {
-        resumeBreakPointManager.persistPosition();
-        assertThat(resumeBreakPointManager.getPosition("target/.scaling/incremental"), is("{}"));
-        assertThat(resumeBreakPointManager.getPosition("target/.scaling/inventory"), is("{\"unfinished\":{},\"finished\":[]}"));
-    }
-    
-    @After
-    @SneakyThrows(IOException.class)
-    public void tearDown() {
-        resumeBreakPointManager.close();
-        FileUtils.forceDeleteOnExit(new File("target/.scaling"));
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManagerTest.java
deleted file mode 100644
index a477b48..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class RegistryRepositoryResumeBreakPointManagerTest {
-    
-    private RegistryRepositoryResumeBreakPointManager resumeBreakPointManager;
-    
-    @Before
-    public void setUp() {
-        ScalingContext.getInstance().init(mockServerConfiguration());
-        resumeBreakPointManager = new RegistryRepositoryResumeBreakPointManager("H2", "/base");
-    }
-    
-    @Test
-    public void assertPersistAndGetPosition() {
-        resumeBreakPointManager.persistPosition();
-        assertThat(resumeBreakPointManager.getPosition("/base/incremental"), is("{}"));
-        assertThat(resumeBreakPointManager.getPosition("/base/inventory"), is("{\"unfinished\":{},\"finished\":[]}"));
-    }
-    
-    @After
-    public void tearDown() {
-        resumeBreakPointManager.close();
-        resetRegistryRepositoryAvailable();
-    }
-    
-    private ServerConfiguration mockServerConfiguration() {
-        resetRegistryRepositoryAvailable();
-        ServerConfiguration result = new ServerConfiguration();
-        result.setDistributedScalingService(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("REG_FIXTURE", "", null), false));
-        return result;
-    }
-    
-    @SneakyThrows(ReflectiveOperationException.class)
-    private void resetRegistryRepositoryAvailable() {
-        ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
deleted file mode 100644
index a8cd2bd..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public final class ScalingPositionResumerTest {
-    
-    private ScalingJob scalingJob;
-    
-    private ScalingPositionResumer scalingPositionResumer;
-    
-    @Before
-    public void setUp() {
-        ScalingContext.getInstance().init(new ServerConfiguration());
-        scalingJob = mockScalingJob();
-        scalingPositionResumer = new ScalingPositionResumer();
-    }
-    
-    @Test
-    public void assertResumePosition() {
-        ResumeBreakPointManager resumeBreakPointManager = ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/position/0");
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PositionManager(new PrimaryKeyPosition(0, 100)));
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", new PositionManager(new PlaceholderPosition()));
-        scalingPositionResumer.resumePosition(scalingJob, new DataSourceManager(), resumeBreakPointManager);
-        assertThat(scalingJob.getIncrementalTasks().size(), is(1));
-        assertTrue(scalingJob.getInventoryTasks().isEmpty());
-    }
-    
-    @Test
-    public void assertPersistPosition() {
-        ResumeBreakPointManager resumeBreakPointManager = mock(ResumeBreakPointManager.class);
-        scalingPositionResumer.persistPosition(scalingJob, resumeBreakPointManager);
-        verify(resumeBreakPointManager).persistPosition();
-    }
-    
-    @SneakyThrows(IOException.class)
-    private ScalingJob mockScalingJob() {
-        return ScalingConfigurationUtil.initJob("/config.json");
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
index b48cc91..8d7f27f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
@@ -17,20 +17,23 @@
 
 package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
 
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import javax.sql.DataSource;
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -51,7 +54,7 @@ public final class InventoryTaskSplitterTest {
     
     private static final String PASSWORD = "password";
     
-    private static final String DATABASE_TYPE = "H2";
+    private ScalingJob scalingJob;
     
     private TaskConfiguration taskConfig;
     
@@ -61,9 +64,7 @@ public final class InventoryTaskSplitterTest {
     
     @Before
     public void setUp() {
-        DumperConfiguration dumperConfig = mockDumperConfig();
-        ImporterConfiguration importerConfig = new ImporterConfiguration();
-        taskConfig = new TaskConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
+        scalingJob = mockScalingJob();
         dataSourceManager = new DataSourceManager();
         inventoryTaskSplitter = new InventoryTaskSplitter();
     }
@@ -77,17 +78,17 @@ public final class InventoryTaskSplitterTest {
     public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
         taskConfig.getJobConfig().setShardingSize(10);
         initIntPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<ScalingTask> actual = (List<ScalingTask>) inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+        List<ScalingTask> actual = (List<ScalingTask>) inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(10));
-        assertThat(((PrimaryKeyPosition) actual.get(9).getPositionManager().getPosition()).getBeginValue(), is(91L));
-        assertThat(((PrimaryKeyPosition) actual.get(9).getPositionManager().getPosition()).getEndValue(), is(100L));
+        assertThat(((PrimaryKeyPosition) actual.get(9).getPosition()).getBeginValue(), is(91L));
+        assertThat(((PrimaryKeyPosition) actual.get(9).getPosition()).getEndValue(), is(100L));
     }
     
     @Test
     public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
         initCharPrimaryEnvironment(taskConfig.getDumperConfig());
-        Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+        Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -95,7 +96,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
-        Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+        Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -103,7 +104,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
-        Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+        Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -150,6 +151,15 @@ public final class InventoryTaskSplitterTest {
         }
     }
     
+    @SneakyThrows(IOException.class)
+    private ScalingJob mockScalingJob() {
+        ScalingJob result = ScalingConfigurationUtil.initJob("/config.json");
+        result.getScalingConfig().getJobConfiguration().setDatabaseType("H2");
+        result.getScalingConfig().getJobConfiguration().setShardingSize(10);
+        taskConfig = new TaskConfiguration(result.getScalingConfig().getJobConfiguration(), mockDumperConfig(), new ImporterConfiguration());
+        return result;
+    }
+    
     private DumperConfiguration mockDumperConfig() {
         ScalingDataSourceConfiguration dataSourceConfig = new StandardJDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
         DumperConfiguration result = new DumperConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java
index 7ca5243..93e53bd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java
@@ -19,13 +19,12 @@ package org.apache.shardingsphere.scaling.core.job.task.incremental;
 
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.job.TaskProgress;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
 import org.junit.After;
 import org.junit.Before;
@@ -82,7 +81,7 @@ public final class IncrementalTaskTest {
         Map<String, String> tableMap = new HashMap<>(1, 1);
         tableMap.put("t_order", "t_order");
         result.setTableNameMap(tableMap);
-        result.setPositionManager(new PositionManager(new PlaceholderPosition()));
+        result.setPosition(new PlaceholderPosition());
         return result;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
index dd588b2..34f5fea 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
@@ -21,14 +21,13 @@ import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.ScalingTaskExecuteException;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.junit.After;
 import org.junit.Before;
@@ -81,7 +80,7 @@ public final class InventoryTaskTest {
         initTableData(taskConfig.getDumperConfig());
         InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
         inventoryDumperConfig.setTableName("t_order");
-        inventoryDumperConfig.setPositionManager(taskConfig.getDumperConfig().getPositionManager());
+        inventoryDumperConfig.setPosition(taskConfig.getDumperConfig().getPosition());
         InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), dataSourceManager);
         inventoryTask.start();
         assertFalse(((InventoryTaskProgress) inventoryTask.getProgress()).isFinished());
@@ -101,7 +100,7 @@ public final class InventoryTaskTest {
         ScalingDataSourceConfiguration dataSourceConfig = new StandardJDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
         DumperConfiguration result = new DumperConfiguration();
         result.setDataSourceConfig(dataSourceConfig);
-        result.setPositionManager(new PositionManager(new PrimaryKeyPosition(1, 100)));
+        result.setPosition(new PrimaryKeyPosition(1, 100));
         result.setTableNameMap(Collections.emptyMap());
         return result;
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index 3b9d490..dd8b98c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -25,18 +25,14 @@ import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourc
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
-import org.apache.shardingsphere.scaling.core.fixture.FixtureResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
-import org.apache.shardingsphere.scaling.core.job.position.resume.FileSystemResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
 import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
 import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -75,7 +71,6 @@ public final class StandaloneScalingJobServiceTest {
     public void setUp() {
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
         ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(TaskExecuteEngine.class));
-        ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FixtureResumeBreakPointManager.class);
     }
     
     @Test
@@ -163,10 +158,4 @@ public final class StandaloneScalingJobServiceTest {
             return resultSet.getLong(1);
         }
     }
-    
-    @After
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void tearDown() {
-        ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FileSystemResumeBreakPointManager.class);
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index 092b4df..8eeffad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLBinlogDumper;
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.scaling.mysql.component.MySQLDataConsistencyChe
 import org.apache.shardingsphere.scaling.mysql.component.MySQLDataSourceChecker;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLImporter;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLJdbcDumper;
-import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionManager;
+import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionInitializer;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
 
 /**
@@ -49,8 +49,8 @@ public final class MySQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
-        return MySQLPositionManager.class;
+    public Class<? extends PositionInitializer> getPositionInitializer() {
+        return MySQLPositionInitializer.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
similarity index 63%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
index a52defa..b4eae3e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
@@ -17,10 +17,7 @@
 
 package org.apache.shardingsphere.scaling.mysql.component;
 
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 
 import javax.sql.DataSource;
@@ -30,33 +27,16 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
- * MySQL binlog position manager.
+ * MySQL binlog position initializer.
  */
-public final class MySQLPositionManager extends PositionManager {
-    
-    public MySQLPositionManager(final DataSource dataSource) {
-        super(dataSource);
-        initPosition();
-    }
-    
-    public MySQLPositionManager(final String position) {
-        super(new Gson().fromJson(position, BinlogPosition.class));
-    }
+public final class MySQLPositionInitializer implements PositionInitializer<BinlogPosition> {
     
     @Override
-    public BinlogPosition getPosition() {
-        Position<?> position = super.getPosition();
-        Preconditions.checkState(null != position, "Unknown position.");
-        return (BinlogPosition) position;
-    }
-    
-    private void initPosition() {
-        try (Connection connection = getDataSource().getConnection()) {
-            BinlogPosition position = getBinlogPosition(connection);
-            position.setServerId(getServerId(connection));
-            setPosition(position);
-        } catch (final SQLException ex) {
-            throw new RuntimeException("init position failed.", ex);
+    public BinlogPosition init(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            BinlogPosition result = getBinlogPosition(connection);
+            result.setServerId(getServerId(connection));
+            return result;
         }
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java
index a41ae62..5627109 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.scaling.mysql.component.MySQLDataConsistencyChe
 import org.apache.shardingsphere.scaling.mysql.component.MySQLDataSourceChecker;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLImporter;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLJdbcDumper;
-import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionManager;
+import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionInitializer;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -37,7 +37,7 @@ public final class MySQLScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType("MySQL");
         assertTrue(scalingEntry instanceof MySQLScalingEntry);
-        assertThat(scalingEntry.getPositionManager(), equalTo(MySQLPositionManager.class));
+        assertThat(scalingEntry.getPositionInitializer(), equalTo(MySQLPositionInitializer.class));
         assertThat(scalingEntry.getDataSourceCheckerClass(), equalTo(MySQLDataSourceChecker.class));
         assertThat(scalingEntry.getImporterClass(), equalTo(MySQLImporter.class));
         assertThat(scalingEntry.getJdbcDumperClass(), equalTo(MySQLJdbcDumper.class));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializerTest.java
similarity index 75%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializerTest.java
index 6ef60ca..4c0fae9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.mysql.component;
 
-import com.google.gson.Gson;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +36,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class MySQLPositionManagerTest {
+public final class MySQLPositionInitializerTest {
     
     private static final String LOG_FILE_NAME = "binlog-000001";
     
@@ -61,30 +60,14 @@ public final class MySQLPositionManagerTest {
     }
     
     @Test
-    public void assertGetCurrentPosition() {
-        MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
-        BinlogPosition actual = mysqlPositionManager.getPosition();
+    public void assertGetCurrentPosition() throws SQLException {
+        MySQLPositionInitializer mySQLPositionInitializer = new MySQLPositionInitializer();
+        BinlogPosition actual = mySQLPositionInitializer.init(dataSource);
         assertThat(actual.getServerId(), is(SERVER_ID));
         assertThat(actual.getFilename(), is(LOG_FILE_NAME));
         assertThat(actual.getPosition(), is(LOG_POSITION));
     }
     
-    @Test
-    public void assertInitPositionByJson() {
-        MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(new Gson().toJson(new BinlogPosition(LOG_FILE_NAME, LOG_POSITION)));
-        BinlogPosition actual = mysqlPositionManager.getPosition();
-        assertThat(actual.getFilename(), is(LOG_FILE_NAME));
-        assertThat(actual.getPosition(), is(LOG_POSITION));
-    }
-    
-    @Test
-    public void assertUpdateCurrentPosition() {
-        MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
-        BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, LOG_POSITION, SERVER_ID, 0);
-        mysqlPositionManager.setPosition(expected);
-        assertThat(mysqlPositionManager.getPosition(), is(expected));
-    }
-    
     private PreparedStatement mockPositionStatement() throws SQLException {
         PreparedStatement result = mock(PreparedStatement.class);
         ResultSet resultSet = mock(ResultSet.class);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index 6ddec40..e0b0297 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -22,14 +22,14 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataConsistencyChecker;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataSourceChecker;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLImporter;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLJdbcDumper;
-import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionManager;
+import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionInitializer;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLWalDumper;
 
@@ -49,8 +49,8 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
-        return PostgreSQLPositionManager.class;
+    public Class<? extends PositionInitializer> getPositionInitializer() {
+        return PostgreSQLPositionInitializer.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
similarity index 74%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index 0e92caa..11e84db 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -17,9 +17,7 @@
 
 package org.apache.shardingsphere.scaling.postgresql.component;
 
-import com.google.common.base.Preconditions;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.util.PSQLException;
@@ -31,9 +29,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
- * PostgreSQL wal position manager.
+ * PostgreSQL wal position initializer.
  */
-public final class PostgreSQLPositionManager extends PositionManager {
+public final class PostgreSQLPositionInitializer implements PositionInitializer<WalPosition> {
     
     public static final String SLOT_NAME = "sharding_scaling";
     
@@ -41,28 +39,11 @@ public final class PostgreSQLPositionManager extends PositionManager {
     
     public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
-    public PostgreSQLPositionManager(final DataSource dataSource) {
-        super(dataSource);
-        initPosition();
-    }
-    
-    public PostgreSQLPositionManager(final String position) {
-        super(new WalPosition(LogSequenceNumber.valueOf(Long.parseLong(position))));
-    }
-    
     @Override
-    public WalPosition getPosition() {
-        Position<?> position = super.getPosition();
-        Preconditions.checkState(null != position, "Unknown position.");
-        return (WalPosition) position;
-    }
-    
-    private void initPosition() {
-        try (Connection connection = getDataSource().getConnection()) {
+    public WalPosition init(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
             createIfNotExists(connection);
-            setPosition(getWalPosition(connection));
-        } catch (final SQLException ex) {
-            throw new RuntimeException("init position failed.", ex);
+            return getWalPosition(connection);
         }
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
index 635c7d5..4e713af 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
@@ -75,7 +75,7 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
         try {
             Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
             DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
-            PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
+            PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber());
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java
index f9e95ad..d37c32e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataCons
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataSourceChecker;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLImporter;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLJdbcDumper;
-import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionManager;
+import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionInitializer;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLWalDumper;
 import org.junit.Test;
 
@@ -37,7 +37,7 @@ public final class PostgreSQLScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType("PostgreSQL");
         assertTrue(scalingEntry instanceof PostgreSQLScalingEntry);
-        assertThat(scalingEntry.getPositionManager(), equalTo(PostgreSQLPositionManager.class));
+        assertThat(scalingEntry.getPositionInitializer(), equalTo(PostgreSQLPositionInitializer.class));
         assertThat(scalingEntry.getDataSourceCheckerClass(), equalTo(PostgreSQLDataSourceChecker.class));
         assertThat(scalingEntry.getDataConsistencyCheckerClass(), equalTo(PostgreSQLDataConsistencyChecker.class));
         assertThat(scalingEntry.getImporterClass(), equalTo(PostgreSQLImporter.class));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
similarity index 79%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
index d3b00c2..8b0e9a7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLPositionManagerTest {
+public final class PostgreSQLPositionInitializerTest {
     
     private static final String POSTGRESQL_96_LSN = "0/14EFDB8";
     
@@ -66,23 +66,17 @@ public final class PostgreSQLPositionManagerTest {
     }
     
     @Test
-    public void assertInitPositionByJson() {
-        WalPosition actual = new PostgreSQLPositionManager("100").getPosition();
-        assertThat(actual.getLogSequenceNumber().asLong(), is(LogSequenceNumber.valueOf(100L).asLong()));
-    }
-    
-    @Test
     public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
-        WalPosition actual = new PostgreSQLPositionManager(dataSource).getPosition();
+        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
     }
     
     @Test
     public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
-        WalPosition actual = new PostgreSQLPositionManager(dataSource).getPosition();
+        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
     }
     
@@ -90,18 +84,7 @@ public final class PostgreSQLPositionManagerTest {
     public void assertGetCurrentPositionThrowException() throws SQLException {
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
-        new PostgreSQLPositionManager(dataSource).getPosition();
-    }
-    
-    @Test
-    @SneakyThrows(SQLException.class)
-    public void assertUpdateCurrentPosition() {
-        when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
-        when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
-        PostgreSQLPositionManager positionManager = new PostgreSQLPositionManager(dataSource);
-        WalPosition expected = new WalPosition(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN));
-        positionManager.setPosition(expected);
-        assertThat(positionManager.getPosition(), is(expected));
+        new PostgreSQLPositionInitializer().init(dataSource);
     }
     
     @SneakyThrows(SQLException.class)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
index db1fb2b..107f76a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.scaling.postgresql.component;
 
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.exception.ScalingTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
 import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
@@ -86,7 +86,7 @@ public final class PostgreSQLWalDumperTest {
             when(logicalReplication.createPgConnection(jdbcDataSourceConfig)).thenReturn(pgConnection);
             when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
             when(pgConnection.getTimestampUtils()).thenReturn(null);
-            when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
+            when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
             ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
             when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
             when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
index 79b9421..34a6824 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -22,6 +22,7 @@ import com.google.gson.GsonBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.scaling.core.api.JobSchedulerCenter;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
@@ -55,12 +56,14 @@ public final class ScalingElasticJob implements SimpleJob {
         scalingConfig.getJobConfiguration().setShardingItem(shardingContext.getShardingItem());
         scalingConfig.getJobConfiguration().setJobId(Long.valueOf(shardingContext.getJobName()));
         scalingJob = SCALING_JOB_SERVICE.start(scalingConfig).orElse(null);
+        JobSchedulerCenter.addJob(scalingJob);
     }
     
     private void stopJob(final ShardingContext shardingContext) {
         if (null != scalingJob) {
             log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
             SCALING_JOB_SERVICE.stop(scalingJob.getJobId());
+            JobSchedulerCenter.removeJob(scalingJob);
             scalingJob = null;
         }
     }