You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/07/27 10:01:31 UTC

[shardingsphere] branch master updated: Scaling job Breakpoint resume ability. (#6460)

This is an automated email from the ASF dual-hosted git repository.

kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 90724a6  Scaling job Breakpoint resume ability. (#6460)
90724a6 is described below

commit 90724a69b598b20f9d18f9b733e1d17ef7326807
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Jul 27 18:01:13 2020 +0800

    Scaling job Breakpoint resume ability. (#6460)
    
    * Scaling job Breakpoint resume ability.
    
    * update shardingsphere-orchestration-repository-zookeeper-curator dependency
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../user-manual/shardingsphere-scaling/usage.cn.md |   2 +-
 .../user-manual/shardingsphere-scaling/usage.en.md |   2 +-
 .../scaling/web/HttpServerHandler.java             |   2 +-
 .../scaling/fixture/FixtureH2ScalingEntry.java     |   6 +-
 ...reNopLogManager.java => FixtureNopManager.java} |  12 +-
 .../shardingsphere-scaling-core/pom.xml            |   5 +
 .../scaling/core/config/JobConfiguration.java      |   4 +-
 .../scaling/core/config/RdbmsConfiguration.java    |  17 +--
 .../ResumeConfiguration.java}                      |  15 ++-
 .../scaling/core/config/ServerConfiguration.java   |   5 +-
 .../core/config/utils/RdbmsConfigurationUtil.java  |  49 +++++++
 .../core/config/utils/SyncConfigurationUtil.java   |  11 +-
 .../executor/AbstractShardingScalingExecutor.java  |   9 ++
 .../executor/channel/DistributionChannel.java      |  12 +-
 .../executor/dumper/AbstractJDBCDumper.java        |  18 ++-
 .../execute/executor/dumper/DumperFactory.java     |  12 +-
 .../execute/executor/importer/ImporterFactory.java |   8 +-
 .../core/execute/executor/record/DataRecord.java   |   6 +-
 .../execute/executor/record/FinishedRecord.java    |   6 +-
 .../execute/executor/record/PlaceholderRecord.java |   6 +-
 .../core/execute/executor/record/Record.java       |   4 +-
 .../scaling/core/job/ShardingScalingJob.java       |   2 +
 .../{NopLogPosition.java => NopPosition.java}      |  14 +-
 .../position/{LogPosition.java => Position.java}   |  12 +-
 ...ogPositionManager.java => PositionManager.java} |  11 +-
 ...gerFactory.java => PositionManagerFactory.java} |  25 +++-
 .../core/job/position/PrimaryKeyPosition.java      |  87 ++++++++++++
 ...osition.java => PrimaryKeyPositionManager.java} |  18 ++-
 .../resume/AbstractResumablePositionManager.java   | 150 +++++++++++++++++++++
 .../resume/FakeResumablePositionManager.java}      |  24 +---
 .../position/resume/ResumablePositionManager.java  |  68 ++++++++++
 .../ResumablePositionManagerFactory.java}          |  29 ++--
 .../resume/ZookeeperResumablePositionManager.java  | 104 ++++++++++++++
 .../job/preparer/ShardingScalingJobPreparer.java   |  64 +++++----
 .../job/preparer/resumer/SyncPositionResumer.java  | 140 +++++++++++++++++++
 .../splitter/InventoryDataTaskSplitter.java        |  18 ++-
 .../core/job/preparer/utils/JobPrepareUtil.java    |  52 +++++++
 .../core/job/task/DefaultSyncTaskFactory.java      |   9 +-
 .../scaling/core/job/task/ScalingTask.java         |  15 +++
 .../scaling/core/job/task/SyncTaskFactory.java     |  10 +-
 .../incremental/IncrementalDataScalingTask.java    |  30 ++---
 .../IncrementalDataSyncTaskProgress.java           |   4 +-
 .../task/inventory/InventoryDataScalingTask.java   |  42 +++---
 .../inventory/InventoryDataScalingTaskGroup.java   |   7 +-
 .../core/schedule/ScalingTaskScheduler.java        |   4 +
 .../scaling/core/spi/ScalingEntry.java             |   8 +-
 .../core/config/RdbmsConfigurationTest.java        |  10 +-
 .../importer/AbstractJDBCImporterTest.java         |  11 +-
 .../executor/importer/AbstractSqlBuilderTest.java  |   4 +-
 .../core/fixture/FixtureH2ScalingEntry.java        |   4 +-
 .../preparer/resumer/SyncPositionResumerTest.java  | 114 ++++++++++++++++
 .../InventoryDataScalingTaskGroupTest.java         |   2 +-
 .../inventory/InventoryDataScalingTaskTest.java    |   3 +
 .../scaling/mysql/BinlogPosition.java              |  23 +++-
 .../scaling/mysql/MySQLBinlogDumper.java           |   8 +-
 .../scaling/mysql/MySQLImporter.java               |   2 +-
 ...itionManager.java => MySQLPositionManager.java} |  25 ++--
 .../scaling/mysql/MySQLScalingEntry.java           |   6 +-
 .../AbstractResumablePositionManagerTest.java      |  76 +++++++++++
 ...agerTest.java => MySQLPositionManagerTest.java} |  12 +-
 .../scaling/postgresql/PostgreSQLImporter.java     |   2 +-
 ...Manager.java => PostgreSQLPositionManager.java} |  22 +--
 .../scaling/postgresql/PostgreSQLScalingEntry.java |   6 +-
 .../scaling/postgresql/PostgreSQLWalDumper.java    |   8 +-
 .../scaling/postgresql/WalPosition.java            |  15 ++-
 ...est.java => PostgreSQLPositionManagerTest.java} |  20 +--
 66 files changed, 1251 insertions(+), 280 deletions(-)

diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 54d5810..1407028 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -149,7 +149,7 @@ curl -X GET \
             "realTimeSyncTaskProgress": {
                 "id": "realtime-test",
                 "delayMillisecond": 1576563771372,
-                "logPosition": {
+                "position": {
                     "filename": "ON.000007",
                     "position": 177532875,
                     "serverId": 0
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 221943b..9e420af 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -149,7 +149,7 @@ Response:
             "realTimeSyncTaskProgress": {
                 "id": "realtime-test",
                 "delayMillisecond": 1576563771372,
-                "logPosition": {
+                "position": {
                     "filename": "ON.000007",
                     "position": 177532875,
                     "serverId": 0
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 3ee8f6e..10902ee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -88,7 +88,7 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
     
     private void startJob(final ChannelHandlerContext channelHandlerContext, final String requestBody) {
         ScalingConfiguration scalingConfiguration = GSON.fromJson(requestBody, ScalingConfiguration.class);
-        ShardingScalingJob shardingScalingJob = new ShardingScalingJob("Local Sharding Scaling Job");
+        ShardingScalingJob shardingScalingJob = new ShardingScalingJob(scalingConfiguration.getJobConfiguration().getJobName(), scalingConfiguration.getJobConfiguration().getShardingItem());
         shardingScalingJob.getSyncConfigurations().addAll(SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration));
         log.info("start job : {}", requestBody);
         SCALING_JOB_CONTROLLER.start(shardingScalingJob);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index e45fb7c..a322495 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.fixture;
 
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
@@ -37,8 +37,8 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends LogPositionManager> getLogPositionManager() {
-        return FixtureNopLogManager.class;
+    public Class<? extends PositionManager> getPositionManager() {
+        return FixtureNopManager.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopLogManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
similarity index 72%
rename from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopLogManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
index 81bf56e..0a30ccc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopLogManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
@@ -18,22 +18,22 @@
 package org.apache.shardingsphere.scaling.fixture;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 
 import javax.sql.DataSource;
 
 @RequiredArgsConstructor
-public final class FixtureNopLogManager implements LogPositionManager<NopLogPosition> {
+public final class FixtureNopManager implements PositionManager<NopPosition> {
     
     private final DataSource dataSource;
     
     @Override
-    public NopLogPosition getCurrentPosition() {
-        return new NopLogPosition();
+    public NopPosition getCurrentPosition() {
+        return new NopPosition();
     }
     
     @Override
-    public void updateCurrentPosition(final NopLogPosition newLogPosition) {
+    public void updateCurrentPosition(final NopPosition newPosition) {
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index f6cb56e..5e234ce 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -41,6 +41,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-orchestration-repository-zookeeper-curator</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-infra-executor</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index de8046c..ca34fc5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.config;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
@@ -25,7 +24,6 @@ import lombok.Setter;
 /**
  * Job configuration.
  */
-@AllArgsConstructor
 @NoArgsConstructor
 @Setter
 @Getter
@@ -35,6 +33,8 @@ public final class JobConfiguration {
     
     private int retryTimes = 3;
     
+    private String jobName;
+    
     private String[] shardingTables;
     
     private int shardingItem;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
index e8798d8..4468982 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
@@ -21,6 +21,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 import java.util.Map;
 import java.util.Set;
@@ -33,15 +34,19 @@ import java.util.Set;
 @EqualsAndHashCode
 public final class RdbmsConfiguration implements Cloneable {
     
+    private String dataSourceName;
+    
     private DataSourceConfiguration dataSourceConfiguration;
     
     private String tableName;
     
     private Map<String, Set<String>> shardingColumnsMap;
     
-    private String whereCondition;
+    private String primaryKey;
+    
+    private PositionManager positionManager;
     
-    private int spiltNum;
+    private Integer spiltNum;
     
     private Map<String, String> tableNameMap;
     
@@ -58,12 +63,4 @@ public final class RdbmsConfiguration implements Cloneable {
         return (RdbmsConfiguration) origin.clone();
     }
     
-    /**
-     * Get where condition.
-     *
-     * @return "" if whereCondition is null, otherwise whereCondition
-     */
-    public String getWhereCondition() {
-        return null == whereCondition ? "" : whereCondition;
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ResumeConfiguration.java
similarity index 76%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ResumeConfiguration.java
index fb17e65..a543769 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ResumeConfiguration.java
@@ -15,12 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.position;
+package org.apache.shardingsphere.scaling.core.config;
 
-import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
 
 /**
- * Log position interface.
+ * Resume configuration.
  */
-public interface LogPosition<T> extends Comparable<T>, Serializable {
+@Getter
+@Setter
+public final class ResumeConfiguration {
+    
+    private String serverLists;
+    
+    private String namespace;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
index 992640f..d36099c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
@@ -17,10 +17,9 @@
 
 package org.apache.shardingsphere.scaling.core.config;
 
-import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
-
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
 
 /**
  * Global server configuration.
@@ -36,4 +35,6 @@ public final class ServerConfiguration implements YamlConfiguration {
     private int pushTimeout = 1000;
     
     private int workerThread = 30;
+    
+    private ResumeConfiguration resumeConfiguration;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
new file mode 100644
index 0000000..60ddf20
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.config.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+
+/**
+ * Rdbms configuration Util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RdbmsConfigurationUtil {
+    
+    /**
+     * Get sql where condition whit primary key.
+     *
+     * @param rdbmsConfiguration rdbms configuration
+     * @return sql where condition
+     */
+    public static String getWhereCondition(final RdbmsConfiguration rdbmsConfiguration) {
+        return getWhereCondition(rdbmsConfiguration.getPrimaryKey(), rdbmsConfiguration.getPositionManager());
+    }
+    
+    private static String getWhereCondition(final String primaryKey, final PositionManager<PrimaryKeyPosition> positionManager) {
+        if (null == primaryKey || null == positionManager) {
+            return "";
+        }
+        PrimaryKeyPosition position = positionManager.getCurrentPosition();
+        return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, position.getBeginValue(), position.getEndValue());
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
index f5975d0..d2c26b4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
@@ -62,12 +62,11 @@ public final class SyncConfigurationUtil {
         ShardingRuleConfiguration sourceRule = ConfigurationYamlConverter.loadShardingRuleConfiguration(scalingConfiguration.getRuleConfiguration().getSourceRule());
         Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet());
         filterByShardingDataSourceTables(dataSourceTableNameMap, scalingConfiguration.getJobConfiguration());
-        for (String each : dataSourceTableNameMap.keySet()) {
-            RdbmsConfiguration dumperConfiguration = createDumperConfiguration(sourceDatasource.get(each));
+        for (Map.Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
+            RdbmsConfiguration dumperConfiguration = createDumperConfiguration(entry.getKey(), sourceDatasource.get(entry.getKey()));
             dumperConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes());
             RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
-            Map<String, String> tableNameMap = dataSourceTableNameMap.get(each);
-            result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), tableNameMap, dumperConfiguration, importerConfiguration));
+            result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), entry.getValue(), dumperConfiguration, importerConfiguration));
         }
         return result;
     }
@@ -147,8 +146,9 @@ public final class SyncConfigurationUtil {
         }
     }
     
-    private static RdbmsConfiguration createDumperConfiguration(final DataSourceConfiguration dataSourceConfiguration) {
+    private static RdbmsConfiguration createDumperConfiguration(final String dataSourceName, final DataSourceConfiguration dataSourceConfiguration) {
         RdbmsConfiguration result = new RdbmsConfiguration();
+        result.setDataSourceName(dataSourceName);
         Map<String, Object> dataSourceProperties = dataSourceConfiguration.getProps();
         JDBCDataSourceConfiguration dumperDataSourceConfiguration = new JDBCDataSourceConfiguration(
                 dataSourceProperties.containsKey("jdbcUrl") ? dataSourceProperties.get("jdbcUrl").toString() : dataSourceProperties.get("url").toString(),
@@ -189,5 +189,4 @@ public final class SyncConfigurationUtil {
         }
         return Collections.EMPTY_SET;
     }
-    
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
index dcefcb5..5fddd2c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 /**
  * Abstract sharding scaling executor.
@@ -29,6 +30,14 @@ public abstract class AbstractShardingScalingExecutor implements ShardingScaling
     @Setter(AccessLevel.PROTECTED)
     @Getter(AccessLevel.PROTECTED)
     private boolean running;
+    
+    @Setter
+    @Getter
+    private String taskId;
+    
+    @Setter
+    @Getter
+    private PositionManager positionManager;
 
     /**
      * Generic start implement.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java
index a661acf..f3ab05e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.channel;
 
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -55,7 +55,7 @@ public final class DistributionChannel implements Channel {
     
     private final Queue<Record> toBeAcknowledgeRecords = new ConcurrentLinkedQueue<>();
     
-    private final Map<LogPosition, Record> pendingAcknowledgeRecords = new ConcurrentHashMap<>();
+    private final Map<Position, Record> pendingAcknowledgeRecords = new ConcurrentHashMap<>();
     
     private ScheduledExecutorService scheduleAckRecordsExecutor;
     
@@ -78,10 +78,10 @@ public final class DistributionChannel implements Channel {
             List<Record> result = new LinkedList<>();
             while (!toBeAcknowledgeRecords.isEmpty()) {
                 Record record = toBeAcknowledgeRecords.peek();
-                if (pendingAcknowledgeRecords.containsKey(record.getLogPosition())) {
+                if (pendingAcknowledgeRecords.containsKey(record.getPosition())) {
                     result.add(record);
                     toBeAcknowledgeRecords.poll();
-                    pendingAcknowledgeRecords.remove(record.getLogPosition());
+                    pendingAcknowledgeRecords.remove(record.getPosition());
                 } else {
                     break;
                 }
@@ -107,7 +107,7 @@ public final class DistributionChannel implements Channel {
             channels.get(index).pushRecord(dataRecord);
         } else if (PlaceholderRecord.class.equals(record.getClass())) {
             toBeAcknowledgeRecords.add(record);
-            pendingAcknowledgeRecords.put(record.getLogPosition(), record);
+            pendingAcknowledgeRecords.put(record.getPosition(), record);
         } else {
             throw new RuntimeException("Not Support Record Type");
         }
@@ -161,7 +161,7 @@ public final class DistributionChannel implements Channel {
         @Override
         public void onAck(final List<Record> records) {
             for (Record record : records) {
-                pendingAcknowledgeRecords.put(record.getLogPosition(), record);
+                pendingAcknowledgeRecords.put(record.getPosition(), record);
             }
         }
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index b68039d..8cc8b55 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -23,15 +23,17 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
 import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
 
@@ -80,12 +82,12 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
     @Override
     public final void dump(final Channel channel) {
         try (Connection conn = dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()).getConnection()) {
-            String sql = String.format("SELECT * FROM %s %s", rdbmsConfiguration.getTableName(), rdbmsConfiguration.getWhereCondition());
+            String sql = String.format("SELECT * FROM %s %s", rdbmsConfiguration.getTableName(), RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration));
             PreparedStatement ps = createPreparedStatement(conn, sql);
             ResultSet rs = ps.executeQuery();
             ResultSetMetaData metaData = rs.getMetaData();
             while (isRunning() && rs.next()) {
-                DataRecord record = new DataRecord(new NopLogPosition(), metaData.getColumnCount());
+                DataRecord record = new DataRecord(newPrimaryKeyPosition(rs), metaData.getColumnCount());
                 record.setType("BOOTSTRAP-INSERT");
                 record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName()));
                 for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -93,15 +95,23 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
                 }
                 pushRecord(record);
             }
+            pushRecord(new FinishedRecord(new PrimaryKeyPosition.FinishedPosition()));
         } catch (SQLException e) {
             stop();
             channel.close();
             throw new SyncTaskExecuteException(e);
         } finally {
-            pushRecord(new FinishedRecord(new NopLogPosition()));
+            pushRecord(new FinishedRecord(new NopPosition()));
         }
     }
     
+    private PrimaryKeyPosition newPrimaryKeyPosition(final ResultSet rs) throws SQLException {
+        if (null == rdbmsConfiguration.getPrimaryKey()) {
+            return new PrimaryKeyPosition.PlaceholderPosition();
+        }
+        return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getCurrentPosition()).getEndValue());
+    }
+    
     /**
      * Create prepared statement.
      *
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
index 125fa99..20ae77b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -62,11 +62,11 @@ public final class DumperFactory {
      * New instance of log dumper.
      *
      * @param rdbmsConfiguration rdbms configuration
-     * @param position log position
+     * @param position position
      * @return log dumper
      */
     @SneakyThrows
-    public static LogDumper newInstanceLogDumper(final RdbmsConfiguration rdbmsConfiguration, final LogPosition position) {
+    public static LogDumper newInstanceLogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) {
         return newInstanceLogDumper(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, position);
     }
     
@@ -75,12 +75,12 @@ public final class DumperFactory {
      *
      * @param databaseType database type
      * @param rdbmsConfiguration rdbms configuration
-     * @param position log position
+     * @param position position
      * @return log dumper
      */
     @SneakyThrows
-    public static LogDumper newInstanceLogDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final LogPosition position) {
+    public static LogDumper newInstanceLogDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final Position position) {
         ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getLogDumperClass().getConstructor(RdbmsConfiguration.class, LogPosition.class).newInstance(rdbmsConfiguration, position);
+        return scalingEntry.getLogDumperClass().getConstructor(RdbmsConfiguration.class, Position.class).newInstance(rdbmsConfiguration, position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
index 8167a5b..c7a8e54 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
@@ -19,9 +19,9 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 
 /**
  * Importer factory.
@@ -32,7 +32,7 @@ public final class ImporterFactory {
      * New instance of importer.
      *
      * @param rdbmsConfiguration rdbms configuration
-     * @param dataSourceManager data source factory
+     * @param dataSourceManager  data source factory
      * @return importer
      */
     public static Importer newInstance(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
@@ -42,9 +42,9 @@ public final class ImporterFactory {
     /**
      * New instance of importer.
      *
-     * @param databaseType database type
+     * @param databaseType       database type
      * @param rdbmsConfiguration rdbms configuration
-     * @param dataSourceManager data source factory
+     * @param dataSourceManager  data source factory
      * @return importer
      */
     @SneakyThrows
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index e3097e3..bed71b2 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.record;
 
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -43,8 +43,8 @@ public final class DataRecord extends Record {
     
     private String tableName;
     
-    public DataRecord(final LogPosition logPosition, final int columnCount) {
-        super(logPosition);
+    public DataRecord(final Position position, final int columnCount) {
+        super(position);
         columns = new ArrayList<>(columnCount);
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java
index 2f10769..9d92680 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java
@@ -17,14 +17,14 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.record;
 
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 /**
  * Finished record.
  */
 public final class FinishedRecord extends Record {
     
-    public FinishedRecord(final LogPosition logPosition) {
-        super(logPosition);
+    public FinishedRecord(final Position position) {
+        super(position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java
index 30f642f..86d4e26 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java
@@ -17,14 +17,14 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.record;
 
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 /**
  * Placeholder record.
  */
 public final class PlaceholderRecord extends Record {
     
-    public PlaceholderRecord(final LogPosition logPosition) {
-        super(logPosition);
+    public PlaceholderRecord(final Position position) {
+        super(position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java
index 62a21cd..1bd6723 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.record;
 
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
@@ -30,7 +30,7 @@ import lombok.Setter;
 @Setter
 public abstract class Record {
     
-    private final LogPosition logPosition;
+    private final Position position;
         
     private long commitTime;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
index b1fc2db..5fb6dc6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
@@ -48,5 +48,7 @@ public final class ShardingScalingJob {
     
     private final String jobName;
     
+    private final int shardingItem;
+    
     private String status = "RUNNING";
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
similarity index 76%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
index 7675f26..850c276 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
@@ -17,15 +17,23 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
 /**
- * Nop log position.
+ * Nop position.
  */
-public final class NopLogPosition implements LogPosition<NopLogPosition> {
+public final class NopPosition implements Position<NopPosition> {
     
     private static final long serialVersionUID = 1946907178847169020L;
     
     @Override
-    public int compareTo(final NopLogPosition nopLogPosition) {
+    public int compareTo(final NopPosition nopPosition) {
         return 0;
     }
+    
+    @Override
+    public JsonElement toJson() {
+        return new JsonObject();
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
similarity index 80%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
index fb17e65..b34e467 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
@@ -17,10 +17,18 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
+import com.google.gson.JsonElement;
+
 import java.io.Serializable;
 
 /**
- * Log position interface.
+ * Position interface.
  */
-public interface LogPosition<T> extends Comparable<T>, Serializable {
+public interface Position<T> extends Comparable<T>, Serializable {
+    
+    /**
+     * To json element.
+     * @return json element
+     */
+    JsonElement toJson();
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
similarity index 82%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
index 085f03f..fac5976 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
@@ -20,20 +20,21 @@ package org.apache.shardingsphere.scaling.core.job.position;
 /**
  * Database itself data synchronize position manager.
  * Such as mysql binlog, postgreSQL wal.
+ * Or use primary key as position.
  */
-public interface LogPositionManager<T extends LogPosition> {
+public interface PositionManager<T extends Position> {
     
     /**
-     * Get current log position.
+     * Get current position.
      *
-     * @return log position
+     * @return position
      */
     T getCurrentPosition();
     
     /**
      * Update currentPosition.
      *
-     * @param newLogPosition new log position.
+     * @param newPosition new position.
      */
-    void updateCurrentPosition(T newLogPosition);
+    void updateCurrentPosition(T newPosition);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
similarity index 61%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
index 6359b0f..fe0ed84 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
@@ -26,21 +26,34 @@ import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 import javax.sql.DataSource;
 
 /**
- * Log manager factory.
+ * Position manager factory.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class LogPositionManagerFactory {
+public final class PositionManagerFactory {
     
     /**
-     * New instance of log manager.
+     * New instance of position manager.
      *
      * @param databaseType database type
      * @param dataSource data source
-     * @return log manager
+     * @return position manager
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static LogPositionManager newInstanceLogManager(final String databaseType, final DataSource dataSource) {
+    public static PositionManager newInstance(final String databaseType, final DataSource dataSource) {
         ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getLogPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+        return scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+    }
+    
+    /**
+     * New instance of position manager.
+     *
+     * @param databaseType database type
+     * @param position position
+     * @return position manager
+     */
+    @SneakyThrows(ReflectiveOperationException.class)
+    public static PositionManager newInstance(final String databaseType, final String position) {
+        ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
+        return scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
new file mode 100644
index 0000000..f624855
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.List;
+
+/**
+ * Use primary key as position.
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+public class PrimaryKeyPosition implements Position {
+    
+    private static final Gson GSON = new Gson();
+    
+    private long beginValue;
+    
+    private long endValue;
+    
+    @Override
+    public int compareTo(final Object position) {
+        if (null == position) {
+            return 1;
+        }
+        return Long.compare(beginValue, ((PrimaryKeyPosition) position).getBeginValue());
+    }
+    
+    /**
+     * Transform primary key position from json to object.
+     *
+     * @param json json data
+     * @return primary key position
+     */
+    public static PrimaryKeyPosition fromJson(final String json) {
+        List<Double> values = GSON.fromJson(json, List.class);
+        if (values.size() == 2) {
+            return new PrimaryKeyPosition(values.get(0).longValue(), values.get(1).longValue());
+        }
+        return new PlaceholderPosition();
+    }
+    
+    @Override
+    public JsonElement toJson() {
+        return GSON.toJsonTree(new long[]{beginValue, endValue});
+    }
+    
+    /**
+     * Finish flag position for inventory task finished.
+     */
+    public static class FinishedPosition extends PrimaryKeyPosition {
+    
+    }
+    
+    /**
+     * Placeholder position for without primary key table.
+     */
+    public static class PlaceholderPosition extends PrimaryKeyPosition {
+    
+        public PlaceholderPosition() {
+            super(-1, -1);
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
similarity index 67%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
index 7675f26..b101aaf 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
@@ -17,15 +17,23 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
+import lombok.AllArgsConstructor;
+
 /**
- * Nop log position.
+ * Primary key position manager.
  */
-public final class NopLogPosition implements LogPosition<NopLogPosition> {
+@AllArgsConstructor
+public final class PrimaryKeyPositionManager implements PositionManager<PrimaryKeyPosition> {
+    
+    private PrimaryKeyPosition position;
     
-    private static final long serialVersionUID = 1946907178847169020L;
+    @Override
+    public PrimaryKeyPosition getCurrentPosition() {
+        return position;
+    }
     
     @Override
-    public int compareTo(final NopLogPosition nopLogPosition) {
-        return 0;
+    public void updateCurrentPosition(final PrimaryKeyPosition newPosition) {
+        position = newPosition;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
new file mode 100644
index 0000000..d524c13
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract resumable position manager.
+ */
+@Getter
+@Setter
+@Slf4j
+public abstract class AbstractResumablePositionManager implements ResumablePositionManager, Closeable {
+    
+    private static final Gson GSON = new Gson();
+    
+    private String databaseType;
+    
+    private String taskPath;
+    
+    private boolean available;
+    
+    private boolean resumable;
+    
+    private final Map<String, PositionManager<PrimaryKeyPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
+    
+    private final Map<String, PositionManager> incrementalPositionManagerMap = Maps.newConcurrentMap();
+    
+    @Override
+    public void persistInventoryPosition() {
+    }
+    
+    @Override
+    public void persistIncrementalPosition() {
+    }
+    
+    protected void resumeInventoryPosition(final String data) {
+        if (isEmpty(data)) {
+            return;
+        }
+        log.info("resume inventory position from {} = {}", taskPath, data);
+        InventoryPosition inventoryPosition = InventoryPosition.fromJson(data);
+        Map<String, PrimaryKeyPosition> unfinish = inventoryPosition.getUnfinish();
+        for (Map.Entry<String, PrimaryKeyPosition> entry : unfinish.entrySet()) {
+            getInventoryPositionManagerMap().put(entry.getKey(), new PrimaryKeyPositionManager(entry.getValue()));
+        }
+        for (String each : inventoryPosition.getFinished()) {
+            getInventoryPositionManagerMap().put(each, new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
+        }
+    }
+    
+    protected void resumeIncrementalPosition(final String data) {
+        if (isEmpty(data)) {
+            return;
+        }
+        log.info("resume incremental position from {} = {}", taskPath, data);
+        Map<String, Object> incrementalPosition = GSON.fromJson(data, Map.class);
+        for (Map.Entry<String, Object> entry : incrementalPosition.entrySet()) {
+            getIncrementalPositionManagerMap().put(entry.getKey(), PositionManagerFactory.newInstance(databaseType, entry.getValue().toString()));
+        }
+    }
+    
+    private boolean isEmpty(final String data) {
+        return null == data || "".equals(data);
+    }
+    
+    protected String getInventoryPositionData() {
+        JsonObject result = new JsonObject();
+        JsonObject unfinish = new JsonObject();
+        Set<String> finished = Sets.newHashSet();
+        for (Map.Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionManagerMap().entrySet()) {
+            if (entry.getValue().getCurrentPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
+                finished.add(entry.getKey());
+                continue;
+            }
+            unfinish.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+        }
+        result.add("unfinish", unfinish);
+        result.add("finished", GSON.toJsonTree(finished));
+        return result.toString();
+    }
+    
+    protected String getIncrementalPositionData() {
+        JsonObject result = new JsonObject();
+        for (Map.Entry<String, PositionManager> entry : getIncrementalPositionManagerMap().entrySet()) {
+            result.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+        }
+        return result.toString();
+    }
+    
+    @Override
+    public void close() {
+    
+    }
+    
+    @Getter
+    @Setter
+    private static final class InventoryPosition {
+        
+        private Map<String, PrimaryKeyPosition> unfinish;
+        
+        private Set<String> finished;
+        
+        /**
+         * Transform inventory position from json to object.
+         *
+         * @param data json data
+         * @return inventory position
+         */
+        public static InventoryPosition fromJson(final String data) {
+            InventoryPosition result = new InventoryPosition();
+            JsonObject json = JsonParser.parseString(data).getAsJsonObject();
+            Map<String, Object> unfinish = GSON.fromJson(json.getAsJsonObject("unfinish"), Map.class);
+            result.setUnfinish(unfinish.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> PrimaryKeyPosition.fromJson(entry.getValue().toString()))));
+            result.setFinished(GSON.fromJson(json.getAsJsonArray("finished"), Set.class));
+            return result;
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FakeResumablePositionManager.java
similarity index 68%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FakeResumablePositionManager.java
index de8046c..4a09625 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FakeResumablePositionManager.java
@@ -15,28 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.job.position.resume;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
 /**
- * Job configuration.
+ * Fake resumable position manager as defalut.
  */
-@AllArgsConstructor
 @NoArgsConstructor
-@Setter
-@Getter
-public final class JobConfiguration {
-    
-    private int concurrency = 3;
-    
-    private int retryTimes = 3;
-    
-    private String[] shardingTables;
-    
-    private int shardingItem;
+public final class FakeResumablePositionManager extends AbstractResumablePositionManager implements ResumablePositionManager {
     
+    public FakeResumablePositionManager(final String databaseType, final String taskPath) {
+        setDatabaseType(databaseType);
+        setTaskPath(taskPath);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManager.java
new file mode 100644
index 0000000..df3afdc
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+
+import java.util.Map;
+
+/**
+ * Resumable position manager interface.
+ */
+public interface ResumablePositionManager {
+    
+    /**
+     * If it is available.
+     * @return is available
+     */
+    boolean isAvailable();
+    
+    /**
+     * If has resumable data.
+     * @return is resumable
+     */
+    boolean isResumable();
+    
+    /**
+     * Get inventory position map.
+     * @return inventory position map
+     */
+    Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionManagerMap();
+    
+    /**
+     * Get incremental position map.
+     * @return incremental position map
+     */
+    Map<String, PositionManager> getIncrementalPositionManagerMap();
+    
+    /**
+     * Persist inventory position.
+     */
+    void persistInventoryPosition();
+    
+    /**
+     * Persist incremental position.
+     */
+    void persistIncrementalPosition();
+    
+    /**
+     * Close this manager.
+     */
+    void close();
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManagerFactory.java
similarity index 57%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManagerFactory.java
index 6359b0f..ff6f136 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManagerFactory.java
@@ -15,32 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.position;
+package org.apache.shardingsphere.scaling.core.job.position.resume;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-
-import javax.sql.DataSource;
 
 /**
- * Log manager factory.
+ * Resumable position manager factory.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class LogPositionManagerFactory {
+public final class ResumablePositionManagerFactory {
+    
+    private static Class<? extends ResumablePositionManager> clazz = FakeResumablePositionManager.class;
+    
+    static {
+        if (new ZookeeperResumablePositionManager().isAvailable()) {
+            clazz = ZookeeperResumablePositionManager.class;
+        }
+    }
     
     /**
-     * New instance of log manager.
+     * New resumable position manager instance.
      *
      * @param databaseType database type
-     * @param dataSource data source
-     * @return log manager
+     * @param taskPath     task path for persist data.
+     * @return resumable position manager
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static LogPositionManager newInstanceLogManager(final String databaseType, final DataSource dataSource) {
-        ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getLogPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+    public static ResumablePositionManager newInstance(final String databaseType, final String taskPath) {
+        return clazz.getConstructor(String.class, String.class).newInstance(databaseType, taskPath);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
new file mode 100644
index 0000000..5f2f045
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.orchestration.repository.api.config.OrchestrationCenterConfiguration;
+import org.apache.shardingsphere.orchestration.repository.zookeeper.CuratorZookeeperRepository;
+import org.apache.shardingsphere.scaling.core.config.ResumeConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Depends on zookeeper to manager position.
+ */
+@Slf4j
+public final class ZookeeperResumablePositionManager extends AbstractResumablePositionManager implements ResumablePositionManager {
+    
+    private static final String INVENTORY = "/inventory";
+    
+    private static final String INCREMENTAL = "/incremental";
+    
+    private static final CuratorZookeeperRepository ZOOKEEPER = new CuratorZookeeperRepository();
+    
+    private ScheduledExecutorService executor;
+    
+    private String inventoryPath;
+    
+    private String incrementalPath;
+    
+    public ZookeeperResumablePositionManager() {
+        ResumeConfiguration resumeConfiguration = ScalingContext.getInstance().getServerConfiguration().getResumeConfiguration();
+        if (null != resumeConfiguration) {
+            ZOOKEEPER.init(getCenterConfiguration(resumeConfiguration));
+            log.info("zookeeper resumable position manager is available.");
+            setAvailable(true);
+        }
+    }
+    
+    public ZookeeperResumablePositionManager(final String databaseType, final String taskPath) {
+        setDatabaseType(databaseType);
+        setTaskPath(taskPath);
+        this.inventoryPath = taskPath + INVENTORY;
+        this.incrementalPath = taskPath + INCREMENTAL;
+        resumePosition();
+        setResumable(!getInventoryPositionManagerMap().isEmpty() && !getIncrementalPositionManagerMap().isEmpty());
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleWithFixedDelay(this::persistPosition, 1, 1, TimeUnit.MINUTES);
+    }
+    
+    private OrchestrationCenterConfiguration getCenterConfiguration(final ResumeConfiguration resumeConfiguration) {
+        OrchestrationCenterConfiguration centerConfiguration =
+                new OrchestrationCenterConfiguration("zookeeper", resumeConfiguration.getServerLists(), resumeConfiguration.getNamespace(), new Properties());
+        return centerConfiguration;
+    }
+    
+    @Override
+    public void close() {
+        executor.submit(this::persistPosition);
+        executor.shutdown();
+    }
+    
+    private void resumePosition() {
+        resumeInventoryPosition(ZOOKEEPER.get(inventoryPath));
+        resumeIncrementalPosition(ZOOKEEPER.get(incrementalPath));
+    }
+    
+    private void persistPosition() {
+        persistIncrementalPosition();
+        persistInventoryPosition();
+    }
+    
+    @Override
+    public void persistInventoryPosition() {
+        String result = getInventoryPositionData();
+        ZOOKEEPER.persist(inventoryPath, result);
+        log.info("persist inventory position {} = {}", inventoryPath, result);
+    }
+    
+    @Override
+    public void persistIncrementalPosition() {
+        String result = getIncrementalPositionData();
+        ZOOKEEPER.persist(incrementalPath, result);
+        log.info("persist incremental position {} = {}", incrementalPath, result);
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index fd0f55a..e82f2ad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -17,27 +17,30 @@
 
 package org.apache.shardingsphere.scaling.core.job.preparer;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.schedule.SyncTaskControlStatus;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManagerFactory;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceCheckerCheckerFactory;
+import org.apache.shardingsphere.scaling.core.job.preparer.resumer.SyncPositionResumer;
 import org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryDataTaskSplitter;
+import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.schedule.SyncTaskControlStatus;
 
 import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Sharding scaling job preparer.
@@ -49,6 +52,8 @@ public final class ShardingScalingJobPreparer {
     
     private final InventoryDataTaskSplitter inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
     
+    private final SyncPositionResumer syncPositionResumer = new SyncPositionResumer();
+    
     /**
      * Do prepare work for sharding scaling job.
      *
@@ -58,14 +63,24 @@ public final class ShardingScalingJobPreparer {
         String databaseType = shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration().getDatabaseType().getName();
         try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getSyncConfigurations())) {
             checkDatasources(databaseType, dataSourceManager);
+            ResumablePositionManager resumablePositionManager = getResumablePositionManager(databaseType, shardingScalingJob);
+            if (resumablePositionManager.isResumable()) {
+                syncPositionResumer.resumePosition(shardingScalingJob, dataSourceManager, resumablePositionManager);
+                return;
+            }
             initIncrementalDataTasks(databaseType, shardingScalingJob, dataSourceManager);
-            splitInventoryDataTasks(shardingScalingJob, dataSourceManager);
+            initInventoryDataTasks(shardingScalingJob, dataSourceManager);
+            syncPositionResumer.persistPosition(shardingScalingJob, resumablePositionManager);
         } catch (PrepareFailedException ex) {
             log.warn("Preparing sharding scaling job {} : {} failed", shardingScalingJob.getJobId(), shardingScalingJob.getJobName(), ex);
             shardingScalingJob.setStatus(SyncTaskControlStatus.PREPARING_FAILURE.name());
         }
     }
     
+    private ResumablePositionManager getResumablePositionManager(final String databaseType, final ShardingScalingJob shardingScalingJob) {
+        return ResumablePositionManagerFactory.newInstance(databaseType, String.format("/%s/item-%d", shardingScalingJob.getJobName(), shardingScalingJob.getShardingItem()));
+    }
+    
     private void checkDatasources(final String databaseType, final DataSourceManager dataSourceManager) {
         DataSourceChecker dataSourceChecker = DataSourceCheckerCheckerFactory.newInstanceDataSourceChecker(databaseType);
         dataSourceChecker.checkConnection(dataSourceManager.getCachedDataSources().values());
@@ -73,35 +88,28 @@ public final class ShardingScalingJobPreparer {
         dataSourceChecker.checkVariable(dataSourceManager.getSourceDatasources().values());
     }
     
-    private void splitInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
+    private void initInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
         List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
             allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(each, dataSourceManager));
         }
-        for (Collection<ScalingTask> each : groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+        for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
             shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
         }
     }
     
-    private List<Collection<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
-        List<Collection<ScalingTask>> result = new ArrayList<>(taskNumber);
-        for (int i = 0; i < taskNumber; i++) {
-            result.add(new LinkedList<>());
-        }
-        for (int i = 0; i < allInventoryDataTasks.size(); i++) {
-            result.get(i % taskNumber).add(allInventoryDataTasks.get(i));
-        }
-        return result;
-    }
-    
     private void initIncrementalDataTasks(final String databaseType, final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
-            LogPositionManager logPositionManager = instanceLogPositionManager(databaseType, dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
-            shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each, logPositionManager.getCurrentPosition()));
+            DataSourceConfiguration dataSourceConfiguration = each.getDumperConfiguration().getDataSourceConfiguration();
+            each.getDumperConfiguration().setPositionManager(instancePositionManager(databaseType, dataSourceManager.getDataSource(dataSourceConfiguration)));
+            shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
         }
     }
     
-    private LogPositionManager instanceLogPositionManager(final String databaseType, final DataSource dataSource) {
-        return LogPositionManagerFactory.newInstanceLogManager(databaseType, dataSource);
+    @SuppressWarnings("rawtypes")
+    private PositionManager instancePositionManager(final String databaseType, final DataSource dataSource) {
+        PositionManager positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
+        positionManager.getCurrentPosition();
+        return positionManager;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
new file mode 100644
index 0000000..c1704da
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
+
+import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
+import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
+import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
+import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Sync position resumer.
+ */
+public final class SyncPositionResumer {
+    
+    private final SyncTaskFactory syncTaskFactory = new DefaultSyncTaskFactory();
+    
+    /**
+     * Resume position from this position manager.
+     *
+     * @param shardingScalingJob       sharding scaling job
+     * @param dataSourceManager        dataSource manager
+     * @param resumablePositionManager which position manager resume from
+     */
+    public void resumePosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
+        resumeInventoryPosition(shardingScalingJob, dataSourceManager, resumablePositionManager);
+        resumeIncrementalPosition(shardingScalingJob, resumablePositionManager);
+    }
+    
+    private void resumeInventoryPosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
+        List<ScalingTask> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumablePositionManager);
+        for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+            shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
+        }
+    }
+    
+    private List<ScalingTask> getAllInventoryDataTasks(
+            final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
+        List<ScalingTask> result = new LinkedList<>();
+        for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
+            MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
+            for (Map.Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumablePositionManager).entrySet()) {
+                result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry)));
+            }
+        }
+        return result;
+    }
+    
+    private SyncConfiguration newSyncConfiguration(
+            final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Map.Entry<String, PositionManager<PrimaryKeyPosition>> entry) {
+        String[] splitTable = entry.getKey().split("#");
+        RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
+        splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
+        splitDumperConfig.setPositionManager(entry.getValue());
+        if (splitTable.length == 2) {
+            splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1]));
+        }
+        splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0));
+        return new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
+                splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
+    }
+    
+    private Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionMap(
+            final RdbmsConfiguration dumperConfiguration, final ResumablePositionManager resumablePositionManager) {
+        Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
+        return resumablePositionManager.getInventoryPositionManagerMap().entrySet().stream()
+                .filter(entry -> pattern.matcher(entry.getKey()).find())
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+    
+    private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumablePositionManager resumablePositionManager) {
+        for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
+            each.getDumperConfiguration().setPositionManager(resumablePositionManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
+            shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
+        }
+    }
+    
+    /**
+     * Persist position when init sync job.
+     *
+     * @param shardingScalingJob       sync job
+     * @param resumablePositionManager which position manager resume from
+     */
+    public void persistPosition(final ShardingScalingJob shardingScalingJob, final ResumablePositionManager resumablePositionManager) {
+        persistIncrementalPosition(shardingScalingJob.getIncrementalDataTasks(), resumablePositionManager);
+        persistInventoryPosition(shardingScalingJob.getInventoryDataTasks(), resumablePositionManager);
+    }
+    
+    private void persistInventoryPosition(final List<ScalingTask> inventoryDataTasks, final ResumablePositionManager resumablePositionManager) {
+        for (ScalingTask each : inventoryDataTasks) {
+            if (each instanceof InventoryDataScalingTaskGroup) {
+                putInventoryDataScalingTask(((InventoryDataScalingTaskGroup) each).getScalingTasks(), resumablePositionManager);
+            }
+        }
+        resumablePositionManager.persistInventoryPosition();
+    }
+    
+    private void putInventoryDataScalingTask(final Collection<ScalingTask> scalingTasks, final ResumablePositionManager resumablePositionManager) {
+        for (ScalingTask each : scalingTasks) {
+            resumablePositionManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
+        }
+    }
+    
+    private void persistIncrementalPosition(final List<ScalingTask> incrementalDataTasks, final ResumablePositionManager resumablePositionManager) {
+        for (ScalingTask each : incrementalDataTasks) {
+            resumablePositionManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
+        }
+        resumablePositionManager.persistIncrementalPosition();
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 883af8f..0012b01 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -22,11 +22,15 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
 import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
-import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
 import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -36,14 +40,14 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
-import javax.sql.DataSource;
-
 /**
  * Inventory data task splitter.
  */
 @Slf4j
 public final class InventoryDataTaskSplitter {
     
+    private final SyncTaskFactory syncTaskFactory = new DefaultSyncTaskFactory();
+    
     /**
      * Split inventory data to multi-tasks.
      *
@@ -54,7 +58,7 @@ public final class InventoryDataTaskSplitter {
     public Collection<ScalingTask> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
         Collection<ScalingTask> result = new LinkedList<>();
         for (SyncConfiguration each : splitConfiguration(syncConfiguration, dataSourceManager)) {
-            result.add(new InventoryDataScalingTask(each));
+            result.add(syncTaskFactory.createInventoryDataSyncTask(each));
         }
         return result;
     }
@@ -78,6 +82,7 @@ public final class InventoryDataTaskSplitter {
         for (String each : syncConfiguration.getTableNameMap().keySet()) {
             RdbmsConfiguration dumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
             dumperConfig.setTableName(each);
+            dumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
             result.add(new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
                 dumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
         }
@@ -116,6 +121,7 @@ public final class InventoryDataTaskSplitter {
         Collection<SyncConfiguration> result = new LinkedList<>();
         RdbmsConfiguration dumperConfiguration = syncConfiguration.getDumperConfiguration();
         String primaryKey = metaDataManager.getTableMetaData(dumperConfiguration.getTableName()).getPrimaryKeyColumns().get(0);
+        dumperConfiguration.setPrimaryKey(primaryKey);
         try (Connection connection = dataSource.getConnection()) {
             PreparedStatement ps = connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 1", primaryKey, primaryKey, dumperConfiguration.getTableName()));
             ResultSet rs = ps.executeQuery();
@@ -126,10 +132,10 @@ public final class InventoryDataTaskSplitter {
             for (int i = 0; i < concurrency && min <= max; i++) {
                 RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
                 if (i < concurrency - 1) {
-                    splitDumperConfig.setWhereCondition(String.format("WHERE %s BETWEEN %d AND %d", primaryKey, min, min + step));
+                    splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, min + step)));
                     min = min + step + 1;
                 } else {
-                    splitDumperConfig.setWhereCondition(String.format("WHERE %s BETWEEN %d AND %d", primaryKey, min, max));
+                    splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, max)));
                 }
                 splitDumperConfig.setSpiltNum(i);
                 result.add(new SyncConfiguration(concurrency, syncConfiguration.getTableNameMap(),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
new file mode 100644
index 0000000..ca69ddc
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.preparer.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Scaling job prepare util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class JobPrepareUtil {
+    
+    /**
+     * Group inventory data tasks by task number.
+     *
+     * @param taskNumber task number
+     * @param allInventoryDataTasks all inventory data tasks
+     * @return task group list
+     */
+    public static List<Collection<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
+        List<Collection<ScalingTask>> result = new ArrayList<>(taskNumber);
+        for (int i = 0; i < taskNumber; i++) {
+            result.add(new LinkedList<>());
+        }
+        for (int i = 0; i < allInventoryDataTasks.size(); i++) {
+            result.get(i % taskNumber).add(allInventoryDataTasks.get(i));
+        }
+        return result;
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
index 531c446..f9817b2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
@@ -17,14 +17,13 @@
 
 package org.apache.shardingsphere.scaling.core.job.task;
 
-import java.util.Collection;
-
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
 
+import java.util.Collection;
+
 /**
  * Default sync task factory.
  */
@@ -41,7 +40,7 @@ public final class DefaultSyncTaskFactory implements SyncTaskFactory {
     }
     
     @Override
-    public IncrementalDataScalingTask createIncrementalDataSyncTask(final SyncConfiguration syncConfiguration, final LogPosition logPosition) {
-        return new IncrementalDataScalingTask(syncConfiguration, logPosition);
+    public IncrementalDataScalingTask createIncrementalDataSyncTask(final SyncConfiguration syncConfiguration) {
+        return new IncrementalDataScalingTask(syncConfiguration);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
index 320a0dc..66ba771 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.task;
 
 import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 /**
  * Sync task interface.
@@ -31,4 +32,18 @@ public interface ScalingTask extends ShardingScalingExecutor {
      * @return migrate progress
      */
     SyncProgress getProgress();
+    
+    /**
+     * Get position manager.
+     *
+     * @return position manager
+     */
+    PositionManager getPositionManager();
+    
+    /**
+     * Get task id.
+     *
+     * @return task id
+     */
+    String getTaskId();
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
index ceb9f20..9df782c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
@@ -17,14 +17,13 @@
 
 package org.apache.shardingsphere.scaling.core.job.task;
 
-import java.util.Collection;
-
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
 
+import java.util.Collection;
+
 /**
  * Sync task factory.
  */
@@ -33,7 +32,7 @@ public interface SyncTaskFactory {
     /**
      * Create inventory data sync task group.
      *
-     * @param inventoryDataScalingTasks  inventory data sync tasks
+     * @param inventoryDataScalingTasks inventory data sync tasks
      * @return inventory data sync task group
      */
     InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(Collection<ScalingTask> inventoryDataScalingTasks);
@@ -50,8 +49,7 @@ public interface SyncTaskFactory {
      * Create incremental data sync task.
      *
      * @param syncConfiguration sync configuration
-     * @param logPosition  log position of incremental data start
      * @return incremental data sync task
      */
-    IncrementalDataScalingTask createIncrementalDataSyncTask(SyncConfiguration syncConfiguration, LogPosition logPosition);
+    IncrementalDataScalingTask createIncrementalDataSyncTask(SyncConfiguration syncConfiguration);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 5edd960..c690e39 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -20,20 +20,18 @@ package org.apache.shardingsphere.scaling.core.job.task.incremental;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.DistributionChannel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.ImporterFactory;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -51,34 +49,29 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     
     private final DataSourceManager dataSourceManager;
     
-    private final String syncTaskId;
-    
-    private LogPosition logPosition;
-    
     private Dumper dumper;
     
     private long delayMillisecond;
     
-    public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration, final LogPosition logPosition) {
+    public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration) {
         this.syncConfiguration = syncConfiguration;
         this.dataSourceManager = new DataSourceManager();
-        this.logPosition = logPosition;
-        DataSourceMetaData dataSourceMetaData = syncConfiguration.getDumperConfiguration().getDataSourceConfiguration().getDataSourceMetaData();
-        syncTaskId = String.format("incremental-%s", null != dataSourceMetaData.getCatalog() ? dataSourceMetaData.getCatalog() : dataSourceMetaData.getSchema());
+        setTaskId(syncConfiguration.getDumperConfiguration().getDataSourceName());
+        setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
     }
     
     @Override
     public void start() {
         syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap());
-        dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), logPosition);
+        dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getCurrentPosition());
         Collection<Importer> importers = instanceImporters();
         instanceChannel(importers);
         Future future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
-        
+            
             @Override
             public void onSuccess() {
             }
-        
+            
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("get an error when migrating the increment data", throwable);
@@ -101,7 +94,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     private void instanceChannel(final Collection<Importer> importers) {
         DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
-            logPosition = lastHandledRecord.getLogPosition();
+            getPositionManager().updateCurrentPosition(lastHandledRecord.getPosition());
             delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
         });
         dumper.setChannel(channel);
@@ -115,7 +108,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
             future.get();
         } catch (InterruptedException ignored) {
         } catch (ExecutionException e) {
-            throw new SyncTaskExecuteException(String.format("Task %s execute failed ", syncTaskId), e.getCause());
+            throw new SyncTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), e.getCause());
         }
     }
     
@@ -129,6 +122,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     
     @Override
     public SyncProgress getProgress() {
-        return new IncrementalDataSyncTaskProgress(syncTaskId, delayMillisecond, logPosition);
+        return new IncrementalDataSyncTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getCurrentPosition());
     }
+    
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java
index 1e5bd06..7ba67b1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.core.job.task.incremental;
 
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -34,5 +34,5 @@ public final class IncrementalDataSyncTaskProgress implements SyncProgress {
     
     private final long delayMillisecond;
     
-    private final LogPosition logPosition;
+    private final Position position;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 872f55b..ff2880b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -21,25 +21,27 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
+import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.ImporterFactory;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
@@ -48,14 +50,12 @@ import java.util.concurrent.atomic.AtomicLong;
  * Table slice execute task.
  */
 @Slf4j
-public final class InventoryDataScalingTask implements ScalingTask {
+public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor implements ScalingTask {
     
     private final SyncConfiguration syncConfiguration;
     
     private final DataSourceManager dataSourceManager;
     
-    private final String syncTaskId;
-    
     private long estimatedRows;
     
     private final AtomicLong syncedRows = new AtomicLong();
@@ -69,13 +69,13 @@ public final class InventoryDataScalingTask implements ScalingTask {
     public InventoryDataScalingTask(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
         this.syncConfiguration = syncConfiguration;
         this.dataSourceManager = dataSourceManager;
-        syncTaskId = generateSyncTaskId(syncConfiguration.getDumperConfiguration());
+        setTaskId(generateSyncTaskId(syncConfiguration.getDumperConfiguration()));
+        setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
     }
     
     private String generateSyncTaskId(final RdbmsConfiguration dumperConfiguration) {
-        DataSourceMetaData dataSourceMetaData = dumperConfiguration.getDataSourceConfiguration().getDataSourceMetaData();
-        String result = String.format("inventory-%s-%s", Optional.ofNullable(dataSourceMetaData.getCatalog()).orElse(dataSourceMetaData.getSchema()), dumperConfiguration.getTableName());
-        return null == dumperConfiguration.getWhereCondition() ? result : result + "#" + dumperConfiguration.getSpiltNum();
+        String result = String.format("%s.%s", dumperConfiguration.getDataSourceName(), dumperConfiguration.getTableName());
+        return null == dumperConfiguration.getSpiltNum() ? result : result + "#" + dumperConfiguration.getSpiltNum();
     }
     
     @Override
@@ -89,7 +89,7 @@ public final class InventoryDataScalingTask implements ScalingTask {
             @Override
             public void onSuccess() {
             }
-    
+            
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("get an error when migrating the inventory data", throwable);
@@ -106,7 +106,7 @@ public final class InventoryDataScalingTask implements ScalingTask {
         try (Connection connection = dataSource.getConnection()) {
             ResultSet resultSet = connection.prepareStatement(String.format("SELECT COUNT(*) FROM %s %s",
                     syncConfiguration.getDumperConfiguration().getTableName(),
-                    syncConfiguration.getDumperConfiguration().getWhereCondition()))
+                    RdbmsConfigurationUtil.getWhereCondition(syncConfiguration.getDumperConfiguration())))
                     .executeQuery();
             resultSet.next();
             estimatedRows = resultSet.getInt(1);
@@ -124,8 +124,12 @@ public final class InventoryDataScalingTask implements ScalingTask {
         MemoryChannel channel = new MemoryChannel(records -> {
             int count = 0;
             for (Record record : records) {
-                if (DataRecord.class.equals(record.getClass())) {
+                if (record instanceof DataRecord) {
                     count++;
+                } else if (record instanceof FinishedRecord) {
+                    if (record.getPosition() instanceof PrimaryKeyPosition) {
+                        getPositionManager().updateCurrentPosition(record.getPosition());
+                    }
                 }
             }
             syncedRows.addAndGet(count);
@@ -139,7 +143,7 @@ public final class InventoryDataScalingTask implements ScalingTask {
             future.get();
         } catch (InterruptedException ignored) {
         } catch (ExecutionException e) {
-            throw new SyncTaskExecuteException(String.format("Task %s execute failed ", syncTaskId), e.getCause());
+            throw new SyncTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), e.getCause());
         }
     }
     
@@ -153,10 +157,6 @@ public final class InventoryDataScalingTask implements ScalingTask {
     
     @Override
     public SyncProgress getProgress() {
-        return new InventoryDataSyncTaskProgress(syncTaskId, estimatedRows, syncedRows.get());
-    }
-    
-    @Override
-    public void run() {
+        return new InventoryDataSyncTaskProgress(getTaskId(), estimatedRows, syncedRows.get());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
index d85fc9f..80eefed 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
@@ -17,18 +17,19 @@
 
 package org.apache.shardingsphere.scaling.core.job.task.inventory;
 
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
-import lombok.extern.slf4j.Slf4j;
-
 import java.util.Collection;
 
 /**
  * Inventory data sync task group.
  */
 @Slf4j
+@Getter
 public final class InventoryDataScalingTaskGroup extends AbstractShardingScalingExecutor implements ScalingTask {
     
     private final Collection<ScalingTask> scalingTasks;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index c18705d..30be580 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -62,6 +62,10 @@ public final class ScalingTaskScheduler implements Runnable {
     public void run() {
         shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
         ExecuteCallback inventoryDataTaskCallback = createInventoryDataTaskCallback();
+        if (shardingScalingJob.getInventoryDataTasks().size() == 0) {
+            executeIncrementalDataSyncTask();
+            return;
+        }
         for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
             ScalingContext.getInstance().getTaskExecuteEngine().submit(each, inventoryDataTaskCallback);
         }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 1ea3139..4e14486 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.spi;
 
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeAwareSPI;
@@ -44,11 +44,11 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
     Class<? extends LogDumper> getLogDumperClass();
 
     /**
-     * Get log position manager type.
+     * Get position manager type.
      *
-     * @return log manager type
+     * @return position manager type
      */
-    Class<? extends LogPositionManager> getLogPositionManager();
+    Class<? extends PositionManager> getPositionManager();
     
     /**
      * Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
index a5cd6e0..66c0774 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.shardingsphere.scaling.core.config;
 
+import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -38,8 +41,9 @@ public final class RdbmsConfigurationTest {
     @Test
     public void assertGetWhereCondition() {
         RdbmsConfiguration rdbmsConfiguration = new RdbmsConfiguration();
-        assertThat(rdbmsConfiguration.getWhereCondition(), is(""));
-        rdbmsConfiguration.setWhereCondition("WHERE 1=1");
-        assertThat(rdbmsConfiguration.getWhereCondition(), is("WHERE 1=1"));
+        assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is(""));
+        rdbmsConfiguration.setPrimaryKey("id");
+        rdbmsConfiguration.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 10)));
+        assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("WHERE id BETWEEN 0 AND 10"));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 5355fc8..abd4069 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -24,17 +24,19 @@ import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -43,7 +45,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.sql.DataSource;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -143,12 +144,12 @@ public final class AbstractJDBCImporterTest {
     private List<Record> mockRecords(final DataRecord dataRecord) {
         List<Record> result = new LinkedList<>();
         result.add(dataRecord);
-        result.add(new FinishedRecord(new NopLogPosition()));
+        result.add(new FinishedRecord(new NopPosition()));
         return result;
     }
     
     private DataRecord getDataRecord(final String recordType) {
-        DataRecord result = new DataRecord(new NopLogPosition(), 3);
+        DataRecord result = new DataRecord(new NopPosition(), 3);
         result.setTableName(TABLE_NAME);
         result.setType(recordType);
         result.addColumn(new Column("id", 1, false, true));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index 22f9ea8..3419d46 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 import com.google.common.collect.Sets;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.junit.Before;
@@ -89,7 +89,7 @@ public class AbstractSqlBuilderTest {
     }
     
     private DataRecord mockDataRecord(final String tableName) {
-        DataRecord result = new DataRecord(new NopLogPosition(), 4);
+        DataRecord result = new DataRecord(new NopPosition(), 4);
         result.setTableName(tableName);
         result.addColumn(new Column("id", "", false, true));
         result.addColumn(new Column("sc", "", false, false));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 5fd7b4c..19f1f95 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.core.fixture;
 
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
@@ -37,7 +37,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends LogPositionManager> getLogPositionManager() {
+    public Class<? extends PositionManager> getPositionManager() {
         return null;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
new file mode 100644
index 0000000..34a679e
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
+
+import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManagerFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public final class SyncPositionResumerTest {
+    
+    private static final String DATA_SOURCE_URL = "jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
+    
+    private static final String USERNAME = "root";
+    
+    private static final String PASSWORD = "password";
+    
+    private ShardingScalingJob shardingScalingJob;
+    
+    private ResumablePositionManager resumablePositionManager;
+    
+    private SyncPositionResumer syncPositionResumer;
+    
+    @Before
+    public void setUp() {
+        ScalingContext.getInstance().init(new ServerConfiguration());
+        shardingScalingJob = new ShardingScalingJob("scalingTest", 0);
+        shardingScalingJob.getSyncConfigurations().add(mockSyncConfiguration());
+        resumablePositionManager = ResumablePositionManagerFactory.newInstance("MySQL", "/scalingTest/item-0");
+        syncPositionResumer = new SyncPositionResumer();
+    }
+    
+    @Test
+    public void assertResumePosition() {
+        resumablePositionManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
+        resumablePositionManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
+        syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumablePositionManager);
+        assertEquals(shardingScalingJob.getIncrementalDataTasks().size(), 1);
+        assertEquals(shardingScalingJob.getInventoryDataTasks().size(), 3);
+    }
+    
+    @Test
+    public void assertPersistPosition() {
+        ResumablePositionManager resumablePositionManager = mock(ResumablePositionManager.class);
+        syncPositionResumer.persistPosition(shardingScalingJob, resumablePositionManager);
+        verify(resumablePositionManager).persistIncrementalPosition();
+        verify(resumablePositionManager).persistInventoryPosition();
+    }
+    
+    private PositionManager mockPositionManager() {
+        return new PositionManager() {
+            @Override
+            public Position getCurrentPosition() {
+                return null;
+            }
+            
+            @Override
+            public void updateCurrentPosition(final Position newPosition) {
+            
+            }
+        };
+    }
+    
+    private SyncConfiguration mockSyncConfiguration() {
+        RdbmsConfiguration dumperConfig = mockDumperConfig();
+        RdbmsConfiguration importerConfig = new RdbmsConfiguration();
+        Map<String, String> tableMap = new HashMap<>();
+        tableMap.put("t_order", "t_order");
+        return new SyncConfiguration(3, tableMap,
+                dumperConfig, importerConfig);
+    }
+    
+    private RdbmsConfiguration mockDumperConfig() {
+        DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
+        RdbmsConfiguration result = new RdbmsConfiguration();
+        result.setDataSourceName("ds0");
+        result.setDataSourceConfiguration(dataSourceConfiguration);
+        return result;
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
index deae566..87d34fb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.scaling.core.job.task.inventory;
 
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.junit.After;
 import org.junit.Before;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index 517444c..a1ffd8c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -25,6 +25,8 @@ import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -94,6 +96,7 @@ public final class InventoryDataScalingTaskTest {
         RdbmsConfiguration result = new RdbmsConfiguration();
         result.setDataSourceConfiguration(dataSourceConfiguration);
         result.setTableName("t_order");
+        result.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(1, 100)));
         return result;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java
index 5599dc6..3549079 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java
@@ -17,7 +17,11 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.Expose;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -27,16 +31,20 @@ import lombok.Setter;
 /**
  * Binlog Position.
  */
-@Getter
-@Setter
-@RequiredArgsConstructor
 @AllArgsConstructor
-public class BinlogPosition implements LogPosition<BinlogPosition> {
+@RequiredArgsConstructor
+@Setter
+@Getter
+public class BinlogPosition implements Position<BinlogPosition> {
     
     private static final long serialVersionUID = -4917415481787093677L;
     
+    private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
+    
+    @Expose
     private final String filename;
     
+    @Expose
     private final long position;
     
     private long serverId;
@@ -54,4 +62,9 @@ public class BinlogPosition implements LogPosition<BinlogPosition> {
     private long toLong() {
         return Long.parseLong(filename.substring(filename.lastIndexOf(".") + 1)) << 32 | position;
     }
+    
+    @Override
+    public JsonElement toJson() {
+        return GSON.toJsonTree(this);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index b1bb0e4..aaadbba 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -22,9 +22,9 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
@@ -65,7 +65,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
     @Setter
     private Channel channel;
     
-    public MySQLBinlogDumper(final RdbmsConfiguration rdbmsConfiguration, final LogPosition binlogPosition) {
+    public MySQLBinlogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position binlogPosition) {
         this.binlogPosition = (BinlogPosition) binlogPosition;
         if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
             throw new UnsupportedOperationException("MySQLBinlogDumper only support JDBCDataSourceConfiguration");
@@ -93,7 +93,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
                 handleEvent(channel, uri, event);
             }
         }
-        pushRecord(channel, new FinishedRecord(new NopLogPosition()));
+        pushRecord(channel, new FinishedRecord(new NopPosition()));
     }
     
     private void handleEvent(final Channel channel, final JdbcUri uri, final AbstractBinlogEvent event) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
index c34538c..521618c 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.scaling.mysql;
 
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 
 /**
  * MySQL importer.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
similarity index 74%
rename from shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
index 666d798..c55859c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import com.google.gson.Gson;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -27,15 +27,24 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
- * MySQL log manager, based on binlog mechanism.
+ * MySQL position manager, based on binlog mechanism.
  */
-@RequiredArgsConstructor
-public final class MySQLLogPositionManager implements LogPositionManager<BinlogPosition> {
+public final class MySQLPositionManager implements PositionManager<BinlogPosition> {
     
-    private final DataSource dataSource;
+    private static final Gson GSON = new Gson();
+    
+    private DataSource dataSource;
     
     private BinlogPosition currentPosition;
     
+    public MySQLPositionManager(final DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+    
+    public MySQLPositionManager(final String position) {
+        currentPosition = GSON.fromJson(position, BinlogPosition.class);
+    }
+    
     @Override
     public BinlogPosition getCurrentPosition() {
         if (null == currentPosition) {
@@ -60,7 +69,7 @@ public final class MySQLLogPositionManager implements LogPositionManager<BinlogP
     }
     
     @Override
-    public void updateCurrentPosition(final BinlogPosition newLogPosition) {
-        this.currentPosition = newLogPosition;
+    public void updateCurrentPosition(final BinlogPosition newPosition) {
+        this.currentPosition = newPosition;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index 51e2959..29dfb1b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.mysql;
 
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -40,8 +40,8 @@ public final class MySQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends LogPositionManager> getLogPositionManager() {
-        return MySQLLogPositionManager.class;
+    public Class<? extends PositionManager> getPositionManager() {
+        return MySQLPositionManager.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
new file mode 100644
index 0000000..33dd347
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
+import org.apache.shardingsphere.scaling.mysql.MySQLPositionManager;
+import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
+import org.apache.shardingsphere.scaling.utils.ReflectionUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public final class AbstractResumablePositionManagerTest {
+    
+    private AbstractResumablePositionManager resumablePositionManager;
+    
+    private final String incrementalPosition = "{\"ds0\":{\"filename\":\"mysql-bin.000001\",\"position\":4},\"ds1\":{\"filename\":\"mysql-bin.000002\",\"position\":4}}";
+    
+    private final String inventoryPosition = "{\"unfinish\":{\"ds1.t_order_1#0\":[0,200],\"ds0.t_order_1#0\":[0,100]},\"finished\":[\"ds0.t_order_1#1\"]}";
+    
+    @Before
+    public void setUp() throws Exception {
+        resumablePositionManager = new AbstractResumablePositionManager() {
+        };
+        resumablePositionManager.setDatabaseType("MySQL");
+        resumablePositionManager.setTaskPath("/scalingTest/item-0");
+        ReflectionUtil.getFieldValueFromClass(new ScalingEntryLoader(), "SCALING_ENTRY_MAP", Map.class).put("MySQL", new MySQLScalingEntry());
+    }
+    
+    @Test
+    public void assertResumeIncrementalPosition() {
+        resumablePositionManager.resumeIncrementalPosition(incrementalPosition);
+        assertEquals(resumablePositionManager.getIncrementalPositionManagerMap().size(), 2);
+    }
+    
+    @Test
+    public void assertResumeInventoryPosition() {
+        resumablePositionManager.resumeInventoryPosition(inventoryPosition);
+        assertEquals(resumablePositionManager.getInventoryPositionManagerMap().size(), 3);
+    }
+    
+    @Test
+    public void assertGetIncrementalPositionData() {
+        resumablePositionManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
+        resumablePositionManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
+        assertEquals(resumablePositionManager.getIncrementalPositionData(), incrementalPosition);
+    }
+    
+    @Test
+    public void assertGetInventoryPositionData() {
+        resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
+        resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
+        resumablePositionManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 200)));
+        assertEquals(resumablePositionManager.getInventoryPositionData(), inventoryPosition);
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
similarity index 88%
rename from shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
index 92d13dd..9b0c753 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
@@ -35,7 +35,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class MySQLLogPositionManagerTest {
+public final class MySQLPositionManagerTest {
     
     private static final String LOG_FILE_NAME = "binlog-000001";
     
@@ -60,8 +60,8 @@ public final class MySQLLogPositionManagerTest {
     
     @Test
     public void assertGetCurrentPosition() {
-        MySQLLogPositionManager mysqlLogManager = new MySQLLogPositionManager(dataSource);
-        BinlogPosition actual = mysqlLogManager.getCurrentPosition();
+        MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
+        BinlogPosition actual = mysqlPositionManager.getCurrentPosition();
         assertThat(actual.getServerId(), is(SERVER_ID));
         assertThat(actual.getFilename(), is(LOG_FILE_NAME));
         assertThat(actual.getPosition(), is(LOG_POSITION));
@@ -69,10 +69,10 @@ public final class MySQLLogPositionManagerTest {
     
     @Test
     public void assertUpdateCurrentPosition() {
-        MySQLLogPositionManager mysqlLogManager = new MySQLLogPositionManager(dataSource);
+        MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
         BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, LOG_POSITION, SERVER_ID);
-        mysqlLogManager.updateCurrentPosition(expected);
-        assertThat(mysqlLogManager.getCurrentPosition(), is(expected));
+        mysqlPositionManager.updateCurrentPosition(expected);
+        assertThat(mysqlPositionManager.getCurrentPosition(), is(expected));
     }
     
     private PreparedStatement mockPositionStatement() throws SQLException {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
index 7e85a70..5995228 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.scaling.postgresql;
 
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 
 /**
  * postgreSQL importer.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
similarity index 84%
rename from shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
index 4f025fb..df3193f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
@@ -17,8 +17,7 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.util.PSQLException;
 
@@ -29,10 +28,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
- * PostgreSQL log position manager.
+ * PostgreSQL position manager.
  */
-@RequiredArgsConstructor
-public final class PostgreSQLLogPositionManager implements LogPositionManager<WalPosition> {
+public final class PostgreSQLPositionManager implements PositionManager<WalPosition> {
     
     public static final String SLOT_NAME = "sharding_scaling";
     
@@ -40,10 +38,18 @@ public final class PostgreSQLLogPositionManager implements LogPositionManager<Wa
     
     public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
-    private final DataSource dataSource;
+    private DataSource dataSource;
     
     private WalPosition currentPosition;
     
+    public PostgreSQLPositionManager(final DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+    
+    public PostgreSQLPositionManager(final String position) {
+        this.currentPosition = new WalPosition(LogSequenceNumber.valueOf(position));
+    }
+    
     @Override
     public WalPosition getCurrentPosition() {
         if (null == currentPosition) {
@@ -89,7 +95,7 @@ public final class PostgreSQLLogPositionManager implements LogPositionManager<Wa
     }
     
     @Override
-    public void updateCurrentPosition(final WalPosition newLogPosition) {
-        currentPosition = newLogPosition;
+    public void updateCurrentPosition(final WalPosition newPosition) {
+        currentPosition = newPosition;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index e250552..e70aaba 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.postgresql;
 
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
@@ -40,8 +40,8 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends LogPositionManager> getLogPositionManager() {
-        return PostgreSQLLogPositionManager.class;
+    public Class<? extends PositionManager> getPositionManager() {
+        return PostgreSQLPositionManager.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index eb5f7ef..4811a9a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
@@ -58,8 +58,8 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor i
     @Setter
     private Channel channel;
     
-    public PostgreSQLWalDumper(final RdbmsConfiguration rdbmsConfiguration, final LogPosition logPosition) {
-        walPosition = (WalPosition) logPosition;
+    public PostgreSQLWalDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) {
+        walPosition = (WalPosition) position;
         if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
             throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
         }
@@ -79,7 +79,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor i
             PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration());
             decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
             PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
-                    PostgreSQLLogPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
+                    PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
             while (isRunning()) {
                 ByteBuffer msg = stream.readPending();
                 if (msg == null) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java
index 8fdc1ba..39eabec 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java
@@ -17,20 +17,24 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.postgresql.replication.LogSequenceNumber;
 
 /**
  * PostgreSQL wal position.
  */
-@Getter
 @RequiredArgsConstructor
-public class WalPosition implements LogPosition<WalPosition> {
+@Getter
+public class WalPosition implements Position<WalPosition> {
     
     private static final long serialVersionUID = -3498484556749679001L;
     
+    private static final Gson GSON = new Gson();
+    
     private final LogSequenceNumber logSequenceNumber;
     
     @Override
@@ -42,4 +46,9 @@ public class WalPosition implements LogPosition<WalPosition> {
         long o2 = walPosition.getLogSequenceNumber().asLong();
         return Long.compare(o1, o2);
     }
+    
+    @Override
+    public JsonElement toJson() {
+        return GSON.toJsonTree(logSequenceNumber.asLong());
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
similarity index 84%
rename from shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
index 7632b03..2e6356b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLLogPositionManagerTest {
+public final class PostgreSQLPositionManagerTest {
     
     private static final String POSTGRESQL_96_LSN = "0/14EFDB8";
     
@@ -66,35 +66,35 @@ public final class PostgreSQLLogPositionManagerTest {
     
     @Test
     public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
-        PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+        PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
-        WalPosition actual = postgreSQLLogManager.getCurrentPosition();
+        WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
     }
     
     @Test
     public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
-        PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+        PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
-        WalPosition actual = postgreSQLLogManager.getCurrentPosition();
+        WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
     }
     
     @Test(expected = RuntimeException.class)
     public void assertGetCurrentPositionThrowException() throws SQLException {
-        PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+        PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
-        postgreSQLLogManager.getCurrentPosition();
+        postgreSQLPositionManager.getCurrentPosition();
     }
     
     @Test
     public void assertUpdateCurrentPosition() {
-        PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+        PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         WalPosition expected = new WalPosition(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN));
-        postgreSQLLogManager.updateCurrentPosition(expected);
-        assertThat(postgreSQLLogManager.getCurrentPosition(), is(expected));
+        postgreSQLPositionManager.updateCurrentPosition(expected);
+        assertThat(postgreSQLPositionManager.getCurrentPosition(), is(expected));
     }
     
     private PreparedStatement mockPostgreSQL96Lsn() throws SQLException {