You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/06 09:07:43 UTC

[shardingsphere] branch master updated: Remove Position generics and Serializable declare. (#6663)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ffa6760  Remove Position generics and Serializable declare. (#6663)
ffa6760 is described below

commit ffa6760b0154fae54c7e7db2dbfe4c297ac883ac
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Aug 6 17:07:30 2020 +0800

    Remove Position generics and Serializable declare. (#6663)
    
    * Remove Position generics and Serializable declare.
    
    * Remove Position generics and Serializable declare.
    
    * add final keyword
    
    * add blank line.
    
    * remove blank line.
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/fixture/FixtureNopManager.java         |  9 ++--
 .../core/config/utils/RdbmsConfigurationUtil.java  |  2 +-
 .../executor/dumper/AbstractJDBCDumper.java        |  2 +-
 .../{Position.java => BasePositionManager.java}    | 25 ++++++-----
 .../scaling/core/job/position/NopPosition.java     |  6 +--
 .../scaling/core/job/position/Position.java        |  5 +--
 .../scaling/core/job/position/PositionManager.java | 10 ++---
 .../core/job/position/PrimaryKeyPosition.java      |  4 +-
 .../job/position/PrimaryKeyPositionManager.java    | 17 ++-----
 .../resume/AbstractResumeBreakPointManager.java    | 11 ++---
 .../position/resume/ResumeBreakPointManager.java   |  3 +-
 .../job/preparer/ShardingScalingJobPreparer.java   |  2 +-
 .../incremental/IncrementalDataScalingTask.java    |  6 +--
 .../task/inventory/InventoryDataScalingTask.java   |  2 +-
 .../preparer/resumer/SyncPositionResumerTest.java  | 19 +-------
 .../scaling/mysql/MySQLPositionManager.java        | 47 ++++++++++---------
 .../scaling/mysql/binlog/BinlogPosition.java       | 10 ++---
 .../AbstractResumeBreakPointManagerTest.java       |  7 +--
 .../scaling/mysql/MySQLPositionManagerTest.java    |  6 +--
 .../postgresql/PostgreSQLPositionManager.java      | 52 +++++++++++-----------
 .../scaling/postgresql/PostgreSQLWalDumper.java    |  3 +-
 .../scaling/postgresql/wal/WalPosition.java        | 10 ++---
 .../postgresql/PostgreSQLPositionManagerTest.java  | 10 ++---
 23 files changed, 118 insertions(+), 150 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
index 0a30ccc..60f0e46 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
@@ -18,22 +18,19 @@
 package org.apache.shardingsphere.scaling.fixture;
 
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 
 import javax.sql.DataSource;
 
 @RequiredArgsConstructor
-public final class FixtureNopManager implements PositionManager<NopPosition> {
+public final class FixtureNopManager extends BasePositionManager<NopPosition> implements PositionManager<NopPosition> {
     
     private final DataSource dataSource;
     
     @Override
-    public NopPosition getCurrentPosition() {
+    public NopPosition getPosition() {
         return new NopPosition();
     }
-    
-    @Override
-    public void updateCurrentPosition(final NopPosition newPosition) {
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
index 074fe71..9d9b97d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
@@ -43,7 +43,7 @@ public final class RdbmsConfigurationUtil {
         if (null == primaryKey || null == positionManager) {
             return "";
         }
-        PrimaryKeyPosition position = positionManager.getCurrentPosition();
+        PrimaryKeyPosition position = positionManager.getPosition();
         return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, position.getBeginValue(), position.getEndValue());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index c7fa660..1f88348 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -109,7 +109,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
         if (null == rdbmsConfiguration.getPrimaryKey()) {
             return new PrimaryKeyPosition.PlaceholderPosition();
         }
-        return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getCurrentPosition()).getEndValue());
+        return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getPosition()).getEndValue());
     }
     
     protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/BasePositionManager.java
similarity index 72%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/BasePositionManager.java
index c6f3eb2..578232a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/BasePositionManager.java
@@ -17,20 +17,21 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import com.google.gson.JsonElement;
-
-import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 
 /**
- * Position interface.
+ * Base position manager.
+ *
+ * @param <T> Position
  */
-// TODO check Serializable is needed
-public interface Position<T> extends Comparable<T>, Serializable {
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+public class BasePositionManager<T extends Position> implements PositionManager<T> {
     
-    /**
-     * To json element.
-     *
-     * @return json element
-     */
-    JsonElement toJson();
+    private T position;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
index 850c276..1947edb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
@@ -23,12 +23,10 @@ import com.google.gson.JsonObject;
 /**
  * Nop position.
  */
-public final class NopPosition implements Position<NopPosition> {
-    
-    private static final long serialVersionUID = 1946907178847169020L;
+public final class NopPosition implements Position {
     
     @Override
-    public int compareTo(final NopPosition nopPosition) {
+    public int compareTo(final Position position) {
         return 0;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
index c6f3eb2..2d6801d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
@@ -19,13 +19,10 @@ package org.apache.shardingsphere.scaling.core.job.position;
 
 import com.google.gson.JsonElement;
 
-import java.io.Serializable;
-
 /**
  * Position interface.
  */
-// TODO check Serializable is needed
-public interface Position<T> extends Comparable<T>, Serializable {
+public interface Position extends Comparable<Position> {
     
     /**
      * To json element.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
index fac5976..181c86f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
@@ -25,16 +25,16 @@ package org.apache.shardingsphere.scaling.core.job.position;
 public interface PositionManager<T extends Position> {
     
     /**
-     * Get current position.
+     * Get position.
      *
      * @return position
      */
-    T getCurrentPosition();
+    T getPosition();
     
     /**
-     * Update currentPosition.
+     * Set Position.
      *
-     * @param newPosition new position.
+     * @param position position.
      */
-    void updateCurrentPosition(T newPosition);
+    void setPosition(T position);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
index 717378a..8ac8736 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -35,8 +35,6 @@ import java.util.List;
 @Setter
 public class PrimaryKeyPosition implements Position {
     
-    private static final long serialVersionUID = 8101879950564531329L;
-    
     private static final Gson GSON = new Gson();
     
     private long beginValue;
@@ -44,7 +42,7 @@ public class PrimaryKeyPosition implements Position {
     private long endValue;
     
     @Override
-    public int compareTo(final Object position) {
+    public int compareTo(final Position position) {
         if (null == position) {
             return 1;
         }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
index b101aaf..67e411f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
@@ -17,23 +17,12 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import lombok.AllArgsConstructor;
-
 /**
  * Primary key position manager.
  */
-@AllArgsConstructor
-public final class PrimaryKeyPositionManager implements PositionManager<PrimaryKeyPosition> {
-    
-    private PrimaryKeyPosition position;
-    
-    @Override
-    public PrimaryKeyPosition getCurrentPosition() {
-        return position;
-    }
+public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
     
-    @Override
-    public void updateCurrentPosition(final PrimaryKeyPosition newPosition) {
-        position = newPosition;
+    public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
+        super(position);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
index 60b515b..d5aa51b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
@@ -26,6 +26,7 @@ import com.google.gson.JsonParser;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
@@ -53,7 +54,7 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
     
     private final Map<String, PositionManager<PrimaryKeyPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
     
-    private final Map<String, PositionManager> incrementalPositionManagerMap = Maps.newConcurrentMap();
+    private final Map<String, PositionManager<Position>> incrementalPositionManagerMap = Maps.newConcurrentMap();
     
     private boolean resumable;
     
@@ -101,11 +102,11 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
         JsonObject unfinished = new JsonObject();
         Set<String> finished = Sets.newHashSet();
         for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : inventoryPositionManagerMap.entrySet()) {
-            if (entry.getValue().getCurrentPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
+            if (entry.getValue().getPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
                 finished.add(entry.getKey());
                 continue;
             }
-            unfinished.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+            unfinished.add(entry.getKey(), entry.getValue().getPosition().toJson());
         }
         result.add(UNFINISHED, unfinished);
         result.add(FINISHED, GSON.toJsonTree(finished));
@@ -114,8 +115,8 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
     
     protected String getIncrementalPositionData() {
         JsonObject result = new JsonObject();
-        for (Entry<String, PositionManager> entry : incrementalPositionManagerMap.entrySet()) {
-            result.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+        for (Entry<String, PositionManager<Position>> entry : incrementalPositionManagerMap.entrySet()) {
+            result.add(entry.getKey(), entry.getValue().getPosition().toJson());
         }
         return result.toString();
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
index e468022..c33e90c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 
@@ -46,7 +47,7 @@ public interface ResumeBreakPointManager {
      *
      * @return incremental position map
      */
-    Map<String, PositionManager> getIncrementalPositionManagerMap();
+    Map<String, PositionManager<Position>> getIncrementalPositionManagerMap();
     
     /**
      * Persist inventory position.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 1822f92..5262f8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -109,7 +109,7 @@ public final class ShardingScalingJobPreparer {
     @SuppressWarnings("rawtypes")
     private PositionManager instancePositionManager(final String databaseType, final DataSource dataSource) {
         PositionManager positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
-        positionManager.getCurrentPosition();
+        positionManager.getPosition();
         return positionManager;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 094532d..3c89c3d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -63,7 +63,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     @Override
     public void start() {
         syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap());
-        dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getCurrentPosition());
+        dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getPosition());
         Collection<Importer> importers = instanceImporters();
         instanceChannel(importers);
         Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
@@ -94,7 +94,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     private void instanceChannel(final Collection<Importer> importers) {
         DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
-            getPositionManager().updateCurrentPosition(lastHandledRecord.getPosition());
+            getPositionManager().setPosition(lastHandledRecord.getPosition());
             delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
         });
         dumper.setChannel(channel);
@@ -122,7 +122,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
     
     @Override
     public SyncProgress getProgress() {
-        return new IncrementalDataSyncTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getCurrentPosition());
+        return new IncrementalDataSyncTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getPosition());
     }
     
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 5745545..021ba04 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -128,7 +128,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
                     count++;
                 } else if (record instanceof FinishedRecord) {
                     if (record.getPosition() instanceof PrimaryKeyPosition) {
-                        getPositionManager().updateCurrentPosition(record.getPosition());
+                        getPositionManager().setPosition(record.getPosition());
                     }
                 }
             }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index 2abcf50..b975366 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -25,8 +25,7 @@ import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
@@ -69,7 +68,7 @@ public final class SyncPositionResumerTest {
     @Test
     public void assertResumePosition() {
         resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
+        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", new BasePositionManager<>());
         syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumeBreakPointManager);
         assertThat(shardingScalingJob.getIncrementalDataTasks().size(), is(1));
         assertTrue(shardingScalingJob.getInventoryDataTasks().isEmpty());
@@ -83,20 +82,6 @@ public final class SyncPositionResumerTest {
         verify(resumeBreakPointManager).persistInventoryPosition();
     }
     
-    private PositionManager mockPositionManager() {
-        return new PositionManager() {
-            
-            @Override
-            public Position getCurrentPosition() {
-                return null;
-            }
-            
-            @Override
-            public void updateCurrentPosition(final Position newPosition) {
-            }
-        };
-    }
-    
     private SyncConfiguration mockSyncConfiguration() {
         RdbmsConfiguration dumperConfig = mockDumperConfig();
         RdbmsConfiguration importerConfig = new RdbmsConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
index 574b7e6..a5435f4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.scaling.mysql;
 
 import com.google.gson.Gson;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 
@@ -30,47 +31,53 @@ import java.sql.SQLException;
 /**
  * MySQL position manager, based on binlog mechanism.
  */
-public final class MySQLPositionManager implements PositionManager<BinlogPosition> {
+public final class MySQLPositionManager extends BasePositionManager<BinlogPosition> implements PositionManager<BinlogPosition> {
     
     private static final Gson GSON = new Gson();
     
     private DataSource dataSource;
     
-    private BinlogPosition currentPosition;
-    
     public MySQLPositionManager(final DataSource dataSource) {
         this.dataSource = dataSource;
     }
     
     public MySQLPositionManager(final String position) {
-        currentPosition = GSON.fromJson(position, BinlogPosition.class);
+        setPosition(GSON.fromJson(position, BinlogPosition.class));
     }
     
     @Override
-    public BinlogPosition getCurrentPosition() {
-        if (null == currentPosition) {
-            getCurrentPositionFromSource();
+    public BinlogPosition getPosition() {
+        BinlogPosition position = super.getPosition();
+        if (null != position) {
+            return position;
         }
-        return currentPosition;
+        initPosition();
+        return super.getPosition();
     }
     
-    private void getCurrentPositionFromSource() {
+    private void initPosition() {
         try (Connection connection = dataSource.getConnection()) {
-            PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
-            ResultSet resultSet = preparedStatement.executeQuery();
-            resultSet.next();
-            currentPosition = new BinlogPosition(resultSet.getString(1), resultSet.getLong(2));
-            preparedStatement = connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
-            resultSet = preparedStatement.executeQuery();
-            resultSet.next();
-            currentPosition.setServerId(resultSet.getLong(2));
+            BinlogPosition binlogPosition = getBinlogPosition(connection);
+            binlogPosition.setServerId(getServerId(connection));
+            setPosition(binlogPosition);
         } catch (final SQLException ex) {
             throw new RuntimeException("markPosition error", ex);
         }
     }
     
-    @Override
-    public void updateCurrentPosition(final BinlogPosition newPosition) {
-        currentPosition = newPosition;
+    private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException {
+        try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
+             ResultSet resultSet = preparedStatement.executeQuery()) {
+            resultSet.next();
+            return new BinlogPosition(resultSet.getString(1), resultSet.getLong(2));
+        }
+    }
+    
+    private long getServerId(final Connection connection) throws SQLException {
+        try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
+             ResultSet resultSet = preparedStatement.executeQuery()) {
+            resultSet.next();
+            return resultSet.getLong(2);
+        }
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
index 6c94772..e59dbd1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
@@ -35,9 +35,7 @@ import lombok.Setter;
 @RequiredArgsConstructor
 @Setter
 @Getter
-public class BinlogPosition implements Position<BinlogPosition> {
-    
-    private static final long serialVersionUID = -4917415481787093677L;
+public class BinlogPosition implements Position {
     
     private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
     
@@ -50,12 +48,12 @@ public class BinlogPosition implements Position<BinlogPosition> {
     private long serverId;
     
     @Override
-    public final int compareTo(final BinlogPosition binlogPosition) {
-        if (null == binlogPosition) {
+    public final int compareTo(final Position position) {
+        if (null == position) {
             return 1;
         }
         long o1 = toLong();
-        long o2 = binlogPosition.toLong();
+        long o2 = ((BinlogPosition) position).toLong();
         return Long.compare(o1, o2);
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
index 443abfd..7315553 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.shardingsphere.scaling.core.job.position.resume;
 
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.mysql.MySQLPositionManager;
 import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 import org.apache.shardingsphere.scaling.utils.ReflectionUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,8 +63,8 @@ public final class AbstractResumeBreakPointManagerTest {
     
     @Test
     public void assertGetIncrementalPositionData() {
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
-        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
+        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new BasePositionManager<>(new BinlogPosition("mysql-bin.000001", 4L)));
+        resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new BasePositionManager<>(new BinlogPosition("mysql-bin.000002", 4L)));
         assertThat(resumeBreakPointManager.getIncrementalPositionData(), is(incrementalPosition));
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
index 49ab8b0..9ce18cc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
@@ -62,7 +62,7 @@ public final class MySQLPositionManagerTest {
     @Test
     public void assertGetCurrentPosition() {
         MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
-        BinlogPosition actual = mysqlPositionManager.getCurrentPosition();
+        BinlogPosition actual = mysqlPositionManager.getPosition();
         assertThat(actual.getServerId(), is(SERVER_ID));
         assertThat(actual.getFilename(), is(LOG_FILE_NAME));
         assertThat(actual.getPosition(), is(LOG_POSITION));
@@ -72,8 +72,8 @@ public final class MySQLPositionManagerTest {
     public void assertUpdateCurrentPosition() {
         MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
         BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, LOG_POSITION, SERVER_ID);
-        mysqlPositionManager.updateCurrentPosition(expected);
-        assertThat(mysqlPositionManager.getCurrentPosition(), is(expected));
+        mysqlPositionManager.setPosition(expected);
+        assertThat(mysqlPositionManager.getPosition(), is(expected));
     }
     
     private PreparedStatement mockPositionStatement() throws SQLException {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
index 08f828c..3d9d525 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
 import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 import org.postgresql.replication.LogSequenceNumber;
@@ -31,7 +32,7 @@ import java.sql.SQLException;
 /**
  * PostgreSQL position manager.
  */
-public final class PostgreSQLPositionManager implements PositionManager<WalPosition> {
+public final class PostgreSQLPositionManager extends BasePositionManager<WalPosition> implements PositionManager<WalPosition> {
     
     public static final String SLOT_NAME = "sharding_scaling";
     
@@ -41,37 +42,36 @@ public final class PostgreSQLPositionManager implements PositionManager<WalPosit
     
     private DataSource dataSource;
     
-    private WalPosition currentPosition;
-    
     public PostgreSQLPositionManager(final DataSource dataSource) {
         this.dataSource = dataSource;
     }
     
     public PostgreSQLPositionManager(final String position) {
-        currentPosition = new WalPosition(LogSequenceNumber.valueOf(position));
+        setPosition(new WalPosition(LogSequenceNumber.valueOf(position)));
     }
     
     @Override
-    public WalPosition getCurrentPosition() {
-        if (null == currentPosition) {
-            getCurrentPositionFromSource();
+    public WalPosition getPosition() {
+        WalPosition position = super.getPosition();
+        if (null != position) {
+            return position;
         }
-        return currentPosition;
+        initPosition();
+        return super.getPosition();
     }
     
-    private void getCurrentPositionFromSource() {
+    private void initPosition() {
         try (Connection connection = dataSource.getConnection()) {
             // Need to create slot first, hold oldest wal event.
             createIfNotExists(connection);
-            currentPosition = getCurrentLsn(connection);
+            setPosition(getWalPosition(connection));
         } catch (final SQLException ex) {
             throw new RuntimeException("markPosition error", ex);
         }
     }
     
     private void createIfNotExists(final Connection connection) throws SQLException {
-        try {
-            PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN));
+        try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN))) {
             ps.execute();
         } catch (final PSQLException ex) {
             if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
@@ -80,23 +80,21 @@ public final class PostgreSQLPositionManager implements PositionManager<WalPosit
         }
     }
     
-    private WalPosition getCurrentLsn(final Connection connection) throws SQLException {
-        String sql;
-        if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
-            sql = "SELECT PG_CURRENT_XLOG_LOCATION()";
-        } else if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
-            sql = "SELECT PG_CURRENT_WAL_LSN()";
-        } else {
-            throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
+    private WalPosition getWalPosition(final Connection connection) throws SQLException {
+        try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
+             ResultSet rs = ps.executeQuery()) {
+            rs.next();
+            return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1)));
         }
-        PreparedStatement ps = connection.prepareStatement(sql);
-        ResultSet rs = ps.executeQuery();
-        rs.next();
-        return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1)));
     }
     
-    @Override
-    public void updateCurrentPosition(final WalPosition newPosition) {
-        currentPosition = newPosition;
+    private String getSql(final Connection connection) throws SQLException {
+        if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
+            return "SELECT PG_CURRENT_XLOG_LOCATION()";
+        }
+        if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
+            return "SELECT PG_CURRENT_WAL_LSN()";
+        }
+        throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 70f6243..3186a8a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -18,15 +18,14 @@
 package org.apache.shardingsphere.scaling.postgresql;
 
 import lombok.Setter;
-
 import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index 8c8e98d..9b56018 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -29,21 +29,19 @@ import org.postgresql.replication.LogSequenceNumber;
  */
 @RequiredArgsConstructor
 @Getter
-public final class WalPosition implements Position<WalPosition> {
-    
-    private static final long serialVersionUID = -3498484556749679001L;
+public final class WalPosition implements Position {
     
     private static final Gson GSON = new Gson();
     
     private final LogSequenceNumber logSequenceNumber;
     
     @Override
-    public int compareTo(final WalPosition walPosition) {
-        if (null == walPosition) {
+    public int compareTo(final Position position) {
+        if (null == position) {
             return 1;
         }
         long o1 = logSequenceNumber.asLong();
-        long o2 = walPosition.logSequenceNumber.asLong();
+        long o2 = ((WalPosition) position).logSequenceNumber.asLong();
         return Long.compare(o1, o2);
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
index a924740..f74c487 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
@@ -70,7 +70,7 @@ public final class PostgreSQLPositionManagerTest {
         PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
-        WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
+        WalPosition actual = postgreSQLPositionManager.getPosition();
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
     }
     
@@ -78,7 +78,7 @@ public final class PostgreSQLPositionManagerTest {
     public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
         PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
-        WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
+        WalPosition actual = postgreSQLPositionManager.getPosition();
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
     }
     
@@ -87,15 +87,15 @@ public final class PostgreSQLPositionManagerTest {
         PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
-        postgreSQLPositionManager.getCurrentPosition();
+        postgreSQLPositionManager.getPosition();
     }
     
     @Test
     public void assertUpdateCurrentPosition() {
         PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
         WalPosition expected = new WalPosition(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN));
-        postgreSQLPositionManager.updateCurrentPosition(expected);
-        assertThat(postgreSQLPositionManager.getCurrentPosition(), is(expected));
+        postgreSQLPositionManager.setPosition(expected);
+        assertThat(postgreSQLPositionManager.getPosition(), is(expected));
     }
     
     private PreparedStatement mockPostgreSQL96Lsn() throws SQLException {