You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/06 15:46:36 UTC

[shardingsphere] branch master updated: add generics (#6678)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new acc64bd  add generics (#6678)
acc64bd is described below

commit acc64bdf0b9b994ca465c2a63a06e6db8221935c
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Aug 6 23:46:22 2020 +0800

    add generics (#6678)
    
    * add generics to GSON.fromJson
    
    * add generics.
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/fixture/FixtureH2ScalingEntry.java     |  3 +-
 .../scaling/fixture/FixtureNopManager.java         | 22 ++++++++--
 .../scaling/core/config/RdbmsConfiguration.java    |  1 +
 .../core/config/utils/RdbmsConfigurationUtil.java  |  1 +
 .../executor/AbstractShardingScalingExecutor.java  |  5 ++-
 .../executor/dumper/AbstractJDBCDumper.java        | 15 ++++---
 .../executor/importer/AbstractJDBCImporter.java    |  3 +-
 .../scaling/core/job/ShardingScalingJob.java       |  6 ++-
 ...yPositionManager.java => FinishedPosition.java} | 17 ++++++--
 ...sitionManager.java => IncrementalPosition.java} |  8 +---
 ...PositionManager.java => InventoryPosition.java} |  8 +---
 ...nManager.java => InventoryPositionManager.java} |  6 +--
 ...sitionManager.java => PlaceholderPosition.java} | 17 ++++++--
 .../core/job/position/PositionManagerFactory.java  | 10 +++--
 .../core/job/position/PrimaryKeyPosition.java      | 36 +---------------
 .../resume/AbstractResumeBreakPointManager.java    | 48 +++++++++++-----------
 .../position/resume/ResumeBreakPointManager.java   |  8 ++--
 .../job/position/utils/InventoryPositionUtil.java} | 43 +++++++++----------
 .../job/preparer/ShardingScalingJobPreparer.java   | 11 ++---
 .../job/preparer/resumer/SyncPositionResumer.java  | 29 ++++++-------
 .../splitter/InventoryDataTaskSplitter.java        | 14 ++++---
 .../core/job/preparer/utils/JobPrepareUtil.java    |  3 +-
 .../core/job/task/DefaultSyncTaskFactory.java      |  3 +-
 .../scaling/core/job/task/ScalingTask.java         |  5 ++-
 .../scaling/core/job/task/SyncTaskFactory.java     |  3 +-
 .../incremental/IncrementalDataScalingTask.java    |  8 +++-
 .../task/inventory/InventoryDataScalingTask.java   | 11 +++--
 .../inventory/InventoryDataScalingTaskGroup.java   | 13 +++---
 .../scaling/core/spi/ScalingEntry.java             |  3 +-
 .../core/config/RdbmsConfigurationTest.java        | 11 +++--
 .../core/fixture/FixtureH2ScalingEntry.java        |  3 +-
 .../preparer/resumer/SyncPositionResumerTest.java  |  4 +-
 .../splitter/InventoryDataTaskSplitterTest.java    |  9 ++--
 .../inventory/InventoryDataScalingTaskTest.java    |  4 +-
 .../scaling/mysql/MySQLBinlogDumper.java           |  2 +-
 .../scaling/mysql/MySQLScalingEntry.java           |  3 +-
 .../scaling/mysql/binlog/BinlogPosition.java       |  6 +--
 .../AbstractResumeBreakPointManagerTest.java       | 12 +++---
 .../scaling/postgresql/PostgreSQLScalingEntry.java |  3 +-
 .../scaling/postgresql/PostgreSQLWalDumper.java    |  2 +-
 .../scaling/postgresql/wal/WalPosition.java        |  3 +-
 41 files changed, 219 insertions(+), 203 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index a322495..fa25298 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.fixture;
 
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
@@ -37,7 +38,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
+    public Class<? extends PositionManager<IncrementalPosition>> getPositionManager() {
         return FixtureNopManager.class;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
index 60f0e46..42d47cb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
@@ -17,20 +17,34 @@
 
 package org.apache.shardingsphere.scaling.fixture;
 
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 
 import javax.sql.DataSource;
 
 @RequiredArgsConstructor
-public final class FixtureNopManager extends BasePositionManager<NopPosition> implements PositionManager<NopPosition> {
+public final class FixtureNopManager extends BasePositionManager<IncrementalPosition> implements PositionManager<IncrementalPosition> {
     
     private final DataSource dataSource;
     
     @Override
-    public NopPosition getPosition() {
-        return new NopPosition();
+    public IncrementalPosition getPosition() {
+        
+        return new IncrementalPosition() {
+            @Override
+            public int compareTo(final Position o) {
+                return 0;
+            }
+            
+            @Override
+            public JsonElement toJson() {
+                return new JsonObject();
+            }
+        };
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
index 4468982..bb9549e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
@@ -44,6 +44,7 @@ public final class RdbmsConfiguration implements Cloneable {
     
     private String primaryKey;
     
+    @SuppressWarnings("rawtypes")
     private PositionManager positionManager;
     
     private Integer spiltNum;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
index 9d9b97d..f338b4f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
@@ -35,6 +35,7 @@ public final class RdbmsConfigurationUtil {
      * @param rdbmsConfiguration rdbms configuration
      * @return SQL where condition
      */
+    @SuppressWarnings("unchecked")
     public static String getWhereCondition(final RdbmsConfiguration rdbmsConfiguration) {
         return getWhereCondition(rdbmsConfiguration.getPrimaryKey(), rdbmsConfiguration.getPositionManager());
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
index 5c39a98..e94fa58 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 /**
@@ -27,7 +28,7 @@ import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
  */
 @Setter
 @Getter
-public abstract class AbstractShardingScalingExecutor implements ShardingScalingExecutor {
+public abstract class AbstractShardingScalingExecutor<T extends Position> implements ShardingScalingExecutor {
     
     @Setter(AccessLevel.PROTECTED)
     @Getter(AccessLevel.PROTECTED)
@@ -35,7 +36,7 @@ public abstract class AbstractShardingScalingExecutor implements ShardingScaling
     
     private String taskId;
     
-    private PositionManager positionManager;
+    private PositionManager<T> positionManager;
     
     @Override
     public void start() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index 1f88348..6da217a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
@@ -31,8 +32,10 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
 import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
@@ -47,7 +50,7 @@ import java.sql.SQLException;
  * Abstract JDBC dumper implement.
  */
 @Slf4j
-public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor implements JDBCDumper {
+public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor<InventoryPosition> implements JDBCDumper {
     
     @Getter(AccessLevel.PROTECTED)
     private final RdbmsConfiguration rdbmsConfiguration;
@@ -87,7 +90,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
             ResultSet rs = ps.executeQuery();
             ResultSetMetaData metaData = rs.getMetaData();
             while (isRunning() && rs.next()) {
-                DataRecord record = new DataRecord(newPrimaryKeyPosition(rs), metaData.getColumnCount());
+                DataRecord record = new DataRecord(newInventoryPosition(rs), metaData.getColumnCount());
                 record.setType("BOOTSTRAP-INSERT");
                 record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName()));
                 for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -95,7 +98,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
                 }
                 pushRecord(record);
             }
-            pushRecord(new FinishedRecord(new PrimaryKeyPosition.FinishedPosition()));
+            pushRecord(new FinishedRecord(new FinishedPosition()));
         } catch (final SQLException ex) {
             stop();
             channel.close();
@@ -105,9 +108,9 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
         }
     }
     
-    private PrimaryKeyPosition newPrimaryKeyPosition(final ResultSet rs) throws SQLException {
+    private InventoryPosition newInventoryPosition(final ResultSet rs) throws SQLException {
         if (null == rdbmsConfiguration.getPrimaryKey()) {
-            return new PrimaryKeyPosition.PlaceholderPosition();
+            return new PlaceholderPosition();
         }
         return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getPosition()).getEndValue());
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index cd268df..4863b8f 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -43,7 +44,7 @@ import java.util.List;
  * Abstract JDBC importer implementation.
  */
 @Slf4j
-public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor implements Importer {
+public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
     
     private final RdbmsConfiguration rdbmsConfiguration;
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
index 2d03acc..aec61ee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.LinkedList;
@@ -42,9 +44,9 @@ public final class ShardingScalingJob {
     
     private final transient List<SyncConfiguration> syncConfigurations = new LinkedList<>();
     
-    private final transient List<ScalingTask> inventoryDataTasks = new LinkedList<>();
+    private final transient List<ScalingTask<InventoryPosition>> inventoryDataTasks = new LinkedList<>();
     
-    private final transient List<ScalingTask> incrementalDataTasks = new LinkedList<>();
+    private final transient List<ScalingTask<IncrementalPosition>> incrementalDataTasks = new LinkedList<>();
     
     private final String jobName;
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
similarity index 72%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
index 67e411f..212f168 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
@@ -17,12 +17,21 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
 /**
- * Primary key position manager.
+ * Finished position.
  */
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
+public final class FinishedPosition implements InventoryPosition {
+    
+    @Override
+    public JsonElement toJson() {
+        return new JsonObject();
+    }
     
-    public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
-        super(position);
+    @Override
+    public int compareTo(final Position o) {
+        return 0;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/IncrementalPosition.java
similarity index 75%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/IncrementalPosition.java
index 67e411f..b1048cb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/IncrementalPosition.java
@@ -18,11 +18,7 @@
 package org.apache.shardingsphere.scaling.core.job.position;
 
 /**
- * Primary key position manager.
+ * Incremental position interface.
  */
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
-    
-    public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
-        super(position);
-    }
+public interface IncrementalPosition extends Position {
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPosition.java
similarity index 75%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPosition.java
index 67e411f..aa956a7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPosition.java
@@ -18,11 +18,7 @@
 package org.apache.shardingsphere.scaling.core.job.position;
 
 /**
- * Primary key position manager.
+ * Inventory position interface.
  */
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
-    
-    public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
-        super(position);
-    }
+public interface InventoryPosition extends Position {
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPositionManager.java
similarity index 78%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPositionManager.java
index 67e411f..dc54746 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPositionManager.java
@@ -18,11 +18,11 @@
 package org.apache.shardingsphere.scaling.core.job.position;
 
 /**
- * Primary key position manager.
+ * Inventory position manager.
  */
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
+public final class InventoryPositionManager<T extends InventoryPosition> extends BasePositionManager<T> implements PositionManager<T> {
     
-    public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
+    public InventoryPositionManager(final T position) {
         super(position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
similarity index 72%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
index 67e411f..416f594 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
@@ -17,12 +17,21 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
 /**
- * Primary key position manager.
+ * Placeholder position.
  */
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
+public final class PlaceholderPosition implements InventoryPosition {
+    
+    @Override
+    public JsonElement toJson() {
+        return new JsonArray();
+    }
     
-    public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
-        super(position);
+    @Override
+    public int compareTo(final Position o) {
+        return 0;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
index fe0ed84..5466876 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
@@ -38,10 +38,11 @@ public final class PositionManagerFactory {
      * @param dataSource data source
      * @return position manager
      */
+    @SuppressWarnings("unchecked")
     @SneakyThrows(ReflectiveOperationException.class)
-    public static PositionManager newInstance(final String databaseType, final DataSource dataSource) {
+    public static PositionManager<IncrementalPosition> newInstance(final String databaseType, final DataSource dataSource) {
         ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+        return (PositionManager<IncrementalPosition>) scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
     }
     
     /**
@@ -51,9 +52,10 @@ public final class PositionManagerFactory {
      * @param position position
      * @return position manager
      */
+    @SuppressWarnings("unchecked")
     @SneakyThrows(ReflectiveOperationException.class)
-    public static PositionManager newInstance(final String databaseType, final String position) {
+    public static PositionManager<IncrementalPosition> newInstance(final String databaseType, final String position) {
         ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
-        return scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
+        return (PositionManager<IncrementalPosition>) scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
index 8ac8736..d3eee8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -24,8 +24,6 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 
-import java.util.List;
-
 /**
  * Use primary key as position.
  */
@@ -33,7 +31,7 @@ import java.util.List;
 @AllArgsConstructor
 @Getter
 @Setter
-public class PrimaryKeyPosition implements Position {
+public class PrimaryKeyPosition implements InventoryPosition {
     
     private static final Gson GSON = new Gson();
     
@@ -49,40 +47,8 @@ public class PrimaryKeyPosition implements Position {
         return Long.compare(beginValue, ((PrimaryKeyPosition) position).beginValue);
     }
     
-    /**
-     * Transform primary key position from json to object.
-     *
-     * @param json json data
-     * @return primary key position
-     */
-    @SuppressWarnings("unchecked")
-    public static PrimaryKeyPosition fromJson(final String json) {
-        List<Double> values = GSON.fromJson(json, List.class);
-        if (2 == values.size()) {
-            return new PrimaryKeyPosition(values.get(0).longValue(), values.get(1).longValue());
-        }
-        return new PlaceholderPosition();
-    }
-    
     @Override
     public JsonElement toJson() {
         return GSON.toJsonTree(new long[]{beginValue, endValue});
     }
-    
-    /**
-     * Finish flag position for inventory task finished.
-     */
-    public static class FinishedPosition extends PrimaryKeyPosition {
-    }
-    
-    /**
-     * Placeholder position for without primary key table.
-     */
-    public static class PlaceholderPosition extends PrimaryKeyPosition {
-        
-        @Override
-        public JsonElement toJson() {
-            return GSON.toJsonTree(new long[0]);
-        }
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
index d5aa51b..206285a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
@@ -26,11 +26,13 @@ import com.google.gson.JsonParser;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.utils.InventoryPositionUtil;
 
 import java.io.Closeable;
 import java.util.Map;
@@ -52,9 +54,9 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
     
     private static final String FINISHED = "finished";
     
-    private final Map<String, PositionManager<PrimaryKeyPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
+    private final Map<String, PositionManager<InventoryPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
     
-    private final Map<String, PositionManager<Position>> incrementalPositionManagerMap = Maps.newConcurrentMap();
+    private final Map<String, PositionManager<IncrementalPosition>> incrementalPositionManagerMap = Maps.newConcurrentMap();
     
     private boolean resumable;
     
@@ -75,23 +77,22 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
             return;
         }
         log.info("resume inventory position from {} = {}", taskPath, data);
-        InventoryPosition inventoryPosition = InventoryPosition.fromJson(data);
-        Map<String, PrimaryKeyPosition> unfinished = inventoryPosition.getUnfinished();
-        for (Entry<String, PrimaryKeyPosition> entry : unfinished.entrySet()) {
-            inventoryPositionManagerMap.put(entry.getKey(), new PrimaryKeyPositionManager(entry.getValue()));
+        InventoryPositions inventoryPositions = InventoryPositions.fromJson(data);
+        Map<String, InventoryPosition> unfinished = inventoryPositions.getUnfinished();
+        for (Entry<String, InventoryPosition> entry : unfinished.entrySet()) {
+            inventoryPositionManagerMap.put(entry.getKey(), new InventoryPositionManager<>(entry.getValue()));
         }
-        for (String each : inventoryPosition.getFinished()) {
-            inventoryPositionManagerMap.put(each, new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
+        for (String each : inventoryPositions.getFinished()) {
+            inventoryPositionManagerMap.put(each, new InventoryPositionManager<>(new FinishedPosition()));
         }
     }
     
-    @SuppressWarnings("unchecked")
     protected void resumeIncrementalPosition(final String data) {
         if (Strings.isNullOrEmpty(data)) {
             return;
         }
         log.info("resume incremental position from {} = {}", taskPath, data);
-        Map<String, Object> incrementalPosition = GSON.fromJson(data, Map.class);
+        Map<String, Object> incrementalPosition = GSON.<Map<String, Object>>fromJson(data, Map.class);
         for (Entry<String, Object> entry : incrementalPosition.entrySet()) {
             getIncrementalPositionManagerMap().put(entry.getKey(), PositionManagerFactory.newInstance(databaseType, entry.getValue().toString()));
         }
@@ -101,8 +102,8 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
         JsonObject result = new JsonObject();
         JsonObject unfinished = new JsonObject();
         Set<String> finished = Sets.newHashSet();
-        for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : inventoryPositionManagerMap.entrySet()) {
-            if (entry.getValue().getPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
+        for (Entry<String, PositionManager<InventoryPosition>> entry : inventoryPositionManagerMap.entrySet()) {
+            if (entry.getValue().getPosition() instanceof FinishedPosition) {
                 finished.add(entry.getKey());
                 continue;
             }
@@ -115,7 +116,7 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
     
     protected String getIncrementalPositionData() {
         JsonObject result = new JsonObject();
-        for (Entry<String, PositionManager<Position>> entry : incrementalPositionManagerMap.entrySet()) {
+        for (Entry<String, PositionManager<IncrementalPosition>> entry : incrementalPositionManagerMap.entrySet()) {
             result.add(entry.getKey(), entry.getValue().getPosition().toJson());
         }
         return result.toString();
@@ -127,9 +128,9 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
     
     @Getter
     @Setter
-    private static final class InventoryPosition {
+    private static final class InventoryPositions {
         
-        private Map<String, PrimaryKeyPosition> unfinished;
+        private Map<String, InventoryPosition> unfinished;
         
         private Set<String> finished;
         
@@ -139,13 +140,12 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
          * @param data json data
          * @return inventory position
          */
-        @SuppressWarnings("unchecked")
-        public static InventoryPosition fromJson(final String data) {
-            InventoryPosition result = new InventoryPosition();
+        public static InventoryPositions fromJson(final String data) {
+            InventoryPositions result = new InventoryPositions();
             JsonObject json = JsonParser.parseString(data).getAsJsonObject();
-            Map<String, Object> unfinished = GSON.fromJson(json.getAsJsonObject(UNFINISHED), Map.class);
-            result.setUnfinished(unfinished.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> PrimaryKeyPosition.fromJson(entry.getValue().toString()))));
-            result.setFinished(GSON.fromJson(json.getAsJsonArray(FINISHED), Set.class));
+            Map<String, Object> unfinished = GSON.<Map<String, Object>>fromJson(json.getAsJsonObject(UNFINISHED), Map.class);
+            result.setUnfinished(unfinished.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> InventoryPositionUtil.fromJson(entry.getValue().toString()))));
+            result.setFinished(GSON.<Set<String>>fromJson(json.getAsJsonArray(FINISHED), Set.class));
             return result;
         }
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
index c33e90c..65b9acd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
-import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 
 import java.util.Map;
 
@@ -40,14 +40,14 @@ public interface ResumeBreakPointManager {
      *
      * @return inventory position map
      */
-    Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionManagerMap();
+    Map<String, PositionManager<InventoryPosition>> getInventoryPositionManagerMap();
     
     /**
      * Get incremental position map.
      *
      * @return incremental position map
      */
-    Map<String, PositionManager<Position>> getIncrementalPositionManagerMap();
+    Map<String, PositionManager<IncrementalPosition>> getIncrementalPositionManagerMap();
     
     /**
      * Persist inventory position.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/utils/InventoryPositionUtil.java
similarity index 51%
copy from shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/utils/InventoryPositionUtil.java
index 9b56018..0c6bbae 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/utils/InventoryPositionUtil.java
@@ -15,38 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.wal;
+package org.apache.shardingsphere.scaling.core.job.position.utils;
 
 import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.postgresql.replication.LogSequenceNumber;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+
+import java.util.List;
 
 /**
- * PostgreSQL wal position.
+ * Inventory position util.
  */
-@RequiredArgsConstructor
-@Getter
-public final class WalPosition implements Position {
+public final class InventoryPositionUtil {
     
     private static final Gson GSON = new Gson();
     
-    private final LogSequenceNumber logSequenceNumber;
-    
-    @Override
-    public int compareTo(final Position position) {
-        if (null == position) {
-            return 1;
+    /**
+     * Transform primary key position from json to object.
+     *
+     * @param json json data
+     * @return primary key position
+     */
+    public static InventoryPosition fromJson(final String json) {
+        List<Double> values = GSON.<List<Double>>fromJson(json, List.class);
+        if (2 == values.size()) {
+            return new PrimaryKeyPosition(values.get(0).longValue(), values.get(1).longValue());
         }
-        long o1 = logSequenceNumber.asLong();
-        long o2 = ((WalPosition) position).logSequenceNumber.asLong();
-        return Long.compare(o1, o2);
-    }
-    
-    @Override
-    public JsonElement toJson() {
-        return GSON.toJsonTree(logSequenceNumber.asLong());
+        return new PlaceholderPosition();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 5262f8e..e2b4676 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -23,6 +23,8 @@ import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
@@ -89,11 +91,11 @@ public final class ShardingScalingJobPreparer {
     }
     
     private void initInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
-        List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
+        List<ScalingTask<InventoryPosition>> allInventoryDataTasks = new LinkedList<>();
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
             allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(each, dataSourceManager));
         }
-        for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+        for (Collection<ScalingTask<InventoryPosition>> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
             shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
         }
     }
@@ -106,9 +108,8 @@ public final class ShardingScalingJobPreparer {
         }
     }
     
-    @SuppressWarnings("rawtypes")
-    private PositionManager instancePositionManager(final String databaseType, final DataSource dataSource) {
-        PositionManager positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
+    private PositionManager<? extends IncrementalPosition> instancePositionManager(final String databaseType, final DataSource dataSource) {
+        PositionManager<? extends IncrementalPosition> positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
         positionManager.getPosition();
         return positionManager;
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
index 18361a5..9ed0da7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -21,8 +21,9 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
@@ -59,25 +60,25 @@ public final class SyncPositionResumer {
     }
     
     private void resumeInventoryPosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
-        List<ScalingTask> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
-        for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+        List<ScalingTask<InventoryPosition>> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
+        for (Collection<ScalingTask<InventoryPosition>> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
             shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
         }
     }
     
-    private List<ScalingTask> getAllInventoryDataTasks(
+    private List<ScalingTask<InventoryPosition>> getAllInventoryDataTasks(
             final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
-        List<ScalingTask> result = new LinkedList<>();
+        List<ScalingTask<InventoryPosition>> result = new LinkedList<>();
         for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
             MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
-            for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
+            for (Entry<String, PositionManager<InventoryPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
                 result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry)));
             }
         }
         return result;
     }
     
-    private SyncConfiguration newSyncConfiguration(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, PositionManager<PrimaryKeyPosition>> entry) {
+    private SyncConfiguration newSyncConfiguration(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, PositionManager<InventoryPosition>> entry) {
         String[] splitTable = entry.getKey().split("#");
         RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
         splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
@@ -90,7 +91,7 @@ public final class SyncPositionResumer {
                 splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
     }
     
-    private Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionMap(
+    private Map<String, PositionManager<InventoryPosition>> getInventoryPositionMap(
             final RdbmsConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) {
         Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
         return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
@@ -116,8 +117,8 @@ public final class SyncPositionResumer {
         persistInventoryPosition(shardingScalingJob.getInventoryDataTasks(), resumeBreakPointManager);
     }
     
-    private void persistInventoryPosition(final List<ScalingTask> inventoryDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
-        for (ScalingTask each : inventoryDataTasks) {
+    private void persistInventoryPosition(final List<ScalingTask<InventoryPosition>> inventoryDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
+        for (ScalingTask<InventoryPosition> each : inventoryDataTasks) {
             if (each instanceof InventoryDataScalingTaskGroup) {
                 putInventoryDataScalingTask(((InventoryDataScalingTaskGroup) each).getScalingTasks(), resumeBreakPointManager);
             }
@@ -125,14 +126,14 @@ public final class SyncPositionResumer {
         resumeBreakPointManager.persistInventoryPosition();
     }
     
-    private void putInventoryDataScalingTask(final Collection<ScalingTask> scalingTasks, final ResumeBreakPointManager resumeBreakPointManager) {
-        for (ScalingTask each : scalingTasks) {
+    private void putInventoryDataScalingTask(final Collection<ScalingTask<InventoryPosition>> scalingTasks, final ResumeBreakPointManager resumeBreakPointManager) {
+        for (ScalingTask<InventoryPosition> each : scalingTasks) {
             resumeBreakPointManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
         }
     }
     
-    private void persistIncrementalPosition(final List<ScalingTask> incrementalDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
-        for (ScalingTask each : incrementalDataTasks) {
+    private void persistIncrementalPosition(final List<ScalingTask<IncrementalPosition>> incrementalDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
+        for (ScalingTask<IncrementalPosition> each : incrementalDataTasks) {
             resumeBreakPointManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
         }
         resumeBreakPointManager.persistIncrementalPosition();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 8930d60..bb39253 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -22,8 +22,10 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
@@ -55,8 +57,8 @@ public final class InventoryDataTaskSplitter {
      * @param dataSourceManager data source manager
      * @return split inventory data task
      */
-    public Collection<ScalingTask> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
-        Collection<ScalingTask> result = new LinkedList<>();
+    public Collection<ScalingTask<InventoryPosition>> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
+        Collection<ScalingTask<InventoryPosition>> result = new LinkedList<>();
         for (SyncConfiguration each : splitConfiguration(syncConfiguration, dataSourceManager)) {
             result.add(syncTaskFactory.createInventoryDataSyncTask(each));
         }
@@ -82,7 +84,7 @@ public final class InventoryDataTaskSplitter {
         for (String each : syncConfiguration.getTableNameMap().keySet()) {
             RdbmsConfiguration dumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
             dumperConfig.setTableName(each);
-            dumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
+            dumperConfig.setPositionManager(new InventoryPositionManager<>(new PlaceholderPosition()));
             result.add(new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
                 dumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
         }
@@ -132,10 +134,10 @@ public final class InventoryDataTaskSplitter {
             for (int i = 0; i < concurrency && min <= max; i++) {
                 RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
                 if (i < concurrency - 1) {
-                    splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, min + step)));
+                    splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, min + step)));
                     min += step + 1;
                 } else {
-                    splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, max)));
+                    splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, max)));
                 }
                 splitDumperConfig.setSpiltNum(i);
                 result.add(new SyncConfiguration(concurrency, syncConfiguration.getTableNameMap(),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
index c6daa5e..d4d4f94 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.job.preparer.utils;
 import com.google.common.collect.Lists;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.List;
@@ -37,7 +38,7 @@ public final class JobPrepareUtil {
      * @param allInventoryDataTasks all inventory data tasks
      * @return task group list
      */
-    public static List<List<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
+    public static List<List<ScalingTask<InventoryPosition>>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask<InventoryPosition>> allInventoryDataTasks) {
         return Lists.partition(allInventoryDataTasks, taskNumber);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
index f9817b2..6972d8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.scaling.core.job.task;
 
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
@@ -30,7 +31,7 @@ import java.util.Collection;
 public final class DefaultSyncTaskFactory implements SyncTaskFactory {
     
     @Override
-    public InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(final Collection<ScalingTask> inventoryDataScalingTasks) {
+    public InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(final Collection<ScalingTask<InventoryPosition>> inventoryDataScalingTasks) {
         return new InventoryDataScalingTaskGroup(inventoryDataScalingTasks);
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
index 66ba771..39b29c2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
@@ -19,12 +19,13 @@ package org.apache.shardingsphere.scaling.core.job.task;
 
 import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 
 /**
  * Sync task interface.
  */
-public interface ScalingTask extends ShardingScalingExecutor {
+public interface ScalingTask<T extends Position> extends ShardingScalingExecutor {
     
     /**
      * Get synchronize progress.
@@ -38,7 +39,7 @@ public interface ScalingTask extends ShardingScalingExecutor {
      *
      * @return position manager
      */
-    PositionManager getPositionManager();
+    PositionManager<T> getPositionManager();
     
     /**
      * Get task id.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
index 9df782c..e77a331 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.scaling.core.job.task;
 
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
@@ -35,7 +36,7 @@ public interface SyncTaskFactory {
      * @param inventoryDataScalingTasks inventory data sync tasks
      * @return inventory data sync task group
      */
-    InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(Collection<ScalingTask> inventoryDataScalingTasks);
+    InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(Collection<ScalingTask<InventoryPosition>> inventoryDataScalingTasks);
     
     /**
      * Create inventory data sync task.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 3c89c3d..8979cf6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.ImporterFactory;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.ArrayList;
@@ -43,7 +44,7 @@ import java.util.concurrent.Future;
  * Incremental data execute task.
  */
 @Slf4j
-public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor implements ScalingTask {
+public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor<IncrementalPosition> implements ScalingTask<IncrementalPosition> {
     
     private final SyncConfiguration syncConfiguration;
     
@@ -53,6 +54,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     
     private long delayMillisecond;
     
+    @SuppressWarnings("unchecked")
     public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration) {
         this.syncConfiguration = syncConfiguration;
         dataSourceManager = new DataSourceManager();
@@ -94,7 +96,9 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     private void instanceChannel(final Collection<Importer> importers) {
         DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
-            getPositionManager().setPosition(lastHandledRecord.getPosition());
+            if (lastHandledRecord.getPosition() instanceof IncrementalPosition) {
+                getPositionManager().setPosition((IncrementalPosition) lastHandledRecord.getPosition());
+            }
             delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
         });
         dumper.setChannel(channel);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 021ba04..ee09470 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -35,7 +35,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import javax.sql.DataSource;
@@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * Table slice execute task.
  */
 @Slf4j
-public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor implements ScalingTask {
+public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> {
     
     private final SyncConfiguration syncConfiguration;
     
@@ -66,6 +66,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
         this(syncConfiguration, new DataSourceManager());
     }
     
+    @SuppressWarnings("unchecked")
     public InventoryDataScalingTask(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
         this.syncConfiguration = syncConfiguration;
         this.dataSourceManager = dataSourceManager;
@@ -126,10 +127,8 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
             for (Record record : records) {
                 if (record instanceof DataRecord) {
                     count++;
-                } else if (record instanceof FinishedRecord) {
-                    if (record.getPosition() instanceof PrimaryKeyPosition) {
-                        getPositionManager().setPosition(record.getPosition());
-                    }
+                } else if (record instanceof FinishedRecord && record.getPosition() instanceof InventoryPosition) {
+                    getPositionManager().setPosition((InventoryPosition) record.getPosition());
                 }
             }
             syncedRows.addAndGet(count);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
index 80eefed..bae3d2b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 
 import java.util.Collection;
@@ -30,25 +31,25 @@ import java.util.Collection;
  */
 @Slf4j
 @Getter
-public final class InventoryDataScalingTaskGroup extends AbstractShardingScalingExecutor implements ScalingTask {
+public final class InventoryDataScalingTaskGroup extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> {
     
-    private final Collection<ScalingTask> scalingTasks;
+    private final Collection<ScalingTask<InventoryPosition>> scalingTasks;
     
-    public InventoryDataScalingTaskGroup(final Collection<ScalingTask> inventoryDataScalingTasks) {
+    public InventoryDataScalingTaskGroup(final Collection<ScalingTask<InventoryPosition>> inventoryDataScalingTasks) {
         scalingTasks = inventoryDataScalingTasks;
     }
     
     @Override
     public void start() {
         super.start();
-        for (ScalingTask each : scalingTasks) {
+        for (ScalingTask<InventoryPosition> each : scalingTasks) {
             each.start();
         }
     }
     
     @Override
     public void stop() {
-        for (ScalingTask each : scalingTasks) {
+        for (ScalingTask<InventoryPosition> each : scalingTasks) {
             each.stop();
         }
     }
@@ -56,7 +57,7 @@ public final class InventoryDataScalingTaskGroup extends AbstractShardingScaling
     @Override
     public SyncProgress getProgress() {
         InventoryDataSyncTaskProgressGroup result = new InventoryDataSyncTaskProgressGroup();
-        for (ScalingTask each : scalingTasks) {
+        for (ScalingTask<InventoryPosition> each : scalingTasks) {
             result.addSyncProgress(each.getProgress());
         }
         return result;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 4e14486..5d3fdc6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.spi;
 
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
@@ -48,7 +49,7 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
      *
      * @return position manager type
      */
-    Class<? extends PositionManager> getPositionManager();
+    Class<? extends PositionManager<? extends IncrementalPosition>> getPositionManager();
     
     /**
      * Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
index 66c0774..2b7081f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
@@ -18,14 +18,13 @@
 package org.apache.shardingsphere.scaling.core.config;
 
 import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 public final class RdbmsConfigurationTest {
     
@@ -33,9 +32,9 @@ public final class RdbmsConfigurationTest {
     public void assertClone() {
         RdbmsConfiguration origin = new RdbmsConfiguration();
         RdbmsConfiguration clone = RdbmsConfiguration.clone(origin);
-        assertTrue(origin.equals(clone));
+        assertThat(clone, is(origin));
         origin.setTableName("t1");
-        assertFalse(origin.equals(clone));
+        assertNotSame(origin, clone);
     }
     
     @Test
@@ -43,7 +42,7 @@ public final class RdbmsConfigurationTest {
         RdbmsConfiguration rdbmsConfiguration = new RdbmsConfiguration();
         assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is(""));
         rdbmsConfiguration.setPrimaryKey("id");
-        rdbmsConfiguration.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 10)));
+        rdbmsConfiguration.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(0, 10)));
         assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("WHERE id BETWEEN 0 AND 10"));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 19f1f95..b856a2d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.fixture;
 
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
@@ -37,7 +38,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
+    public Class<? extends PositionManager<IncrementalPosition>> getPositionManager() {
         return null;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index b975366..f537ec1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
 import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
 import org.junit.Before;
@@ -67,7 +67,7 @@ public final class SyncPositionResumerTest {
     
     @Test
     public void assertResumePosition() {
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
+        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new InventoryPositionManager(new PrimaryKeyPosition(0, 100)));
         resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", new BasePositionManager<>());
         syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumeBreakPointManager);
         assertThat(shardingScalingJob.getIncrementalDataTasks().size(), is(1));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
index f709925..d46326a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.junit.After;
 import org.junit.Before;
@@ -73,7 +74,7 @@ public class InventoryDataTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
         initIntPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
-        Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+        Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(3));
     }
@@ -81,7 +82,7 @@ public class InventoryDataTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
         initCharPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
-        Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+        Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -89,7 +90,7 @@ public class InventoryDataTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
         initUnionPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
-        Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+        Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -97,7 +98,7 @@ public class InventoryDataTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
         initNoPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
-        Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+        Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index a1ffd8c..1c0189c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -96,7 +96,7 @@ public final class InventoryDataScalingTaskTest {
         RdbmsConfiguration result = new RdbmsConfiguration();
         result.setDataSourceConfiguration(dataSourceConfiguration);
         result.setTableName("t_order");
-        result.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(1, 100)));
+        result.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(1, 100)));
         return result;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index 9e002a4..4ba8fcf 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -54,7 +54,7 @@ import java.util.Random;
  * MySQL binlog dumper.
  */
 @Slf4j
-public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor implements LogDumper {
+public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<BinlogPosition> implements LogDumper {
     
     private final BinlogPosition binlogPosition;
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index 29dfb1b..e00a12f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 
 /**
  * MySQL scaling entry.
@@ -40,7 +41,7 @@ public final class MySQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
+    public Class<? extends PositionManager<BinlogPosition>> getPositionManager() {
         return MySQLPositionManager.class;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
index e59dbd1..40801c6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
@@ -21,12 +21,12 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.annotations.Expose;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 /**
  * Binlog Position.
@@ -35,7 +35,7 @@ import lombok.Setter;
 @RequiredArgsConstructor
 @Setter
 @Getter
-public class BinlogPosition implements Position {
+public class BinlogPosition implements IncrementalPosition {
     
     private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
index 7315553..89eeb76 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
@@ -18,8 +18,10 @@
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
 import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
@@ -70,10 +72,10 @@ public final class AbstractResumeBreakPointManagerTest {
     
     @Test
     public void assertGetInventoryPositionData() {
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 100L)));
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
-        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 200L)));
+        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 100L)));
+        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new InventoryPositionManager<>(new FinishedPosition()));
+        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", new InventoryPositionManager<>(new PlaceholderPosition()));
+        resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 200L)));
         assertThat(resumeBreakPointManager.getInventoryPositionData(), is(inventoryPosition));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index e70aaba..8c4bc2f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 
 /**
  * PostgreSQL scaling entry.
@@ -40,7 +41,7 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionManager> getPositionManager() {
+    public Class<? extends PositionManager<WalPosition>> getPositionManager() {
         return PostgreSQLPositionManager.class;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 3186a8a..730c703 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -43,7 +43,7 @@ import java.sql.SQLException;
 /**
  * PostgreSQL WAL dumper.
  */
-public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor implements LogDumper {
+public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<WalPosition> implements LogDumper {
     
     private final WalPosition walPosition;
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index 9b56018..f5c6f60 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.postgresql.replication.LogSequenceNumber;
 
@@ -29,7 +30,7 @@ import org.postgresql.replication.LogSequenceNumber;
  */
 @RequiredArgsConstructor
 @Getter
-public final class WalPosition implements Position {
+public final class WalPosition implements IncrementalPosition {
     
     private static final Gson GSON = new Gson();