You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/02/08 07:49:57 UTC

[shardingsphere] branch master updated: Add YamlJobProgress (#9386)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f72a69  Add YamlJobProgress (#9386)
7f72a69 is described below

commit 7f72a69831c950a2e1476216c75a6f3e95fd8466
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Feb 8 15:49:22 2021 +0800

    Add YamlJobProgress (#9386)
    
    * Optimize ScalingJobPreparer
    
    * Add YamlJobProgress
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../core/api/impl/RegistryRepositoryAPIImpl.java   |  14 +--
 .../core/job/position/PlaceholderPosition.java     |  28 +----
 .../core/job/position/PositionInitializer.java     |  12 +-
 .../job/position/PositionInitializerFactory.java   |  13 +-
 .../core/job/position/PrimaryKeyPosition.java      |  43 +++----
 .../core/job/preparer/ScalingJobPreparer.java      |   4 +-
 .../scaling/core/job/progress/JobProgress.java     | 137 +++------------------
 .../job/progress/yaml/JobProgressYamlSwapper.java  | 123 ++++++++++++++++++
 .../core/job/progress/yaml/YamlJobProgress.java}   |  45 ++++---
 .../scaling/core/spi/ScalingEntry.java             |   2 +-
 .../scaling/core/util/ReflectionUtil.java          |  15 ---
 .../core/fixture/FixtureH2ScalingEntry.java        |   2 +-
 .../core/fixture/FixturePositionInitializer.java   |   7 +-
 .../core/job/position/PlaceholderPositionTest.java |   8 +-
 .../core/job/position/PrimaryKeyPositionTest.java  |  10 +-
 .../scaling/core/job/progress/JobProgressTest.java | 103 ++++++++++++++++
 .../src/test/resources/logback-test.xml            |   3 +-
 .../scaling/mysql/MySQLScalingEntry.java           |   4 +-
 .../scaling/mysql/binlog/BinlogPosition.java       |   9 +-
 .../scaling/mysql/component/MySQLBinlogDumper.java |   6 +-
 .../mysql/component/MySQLPositionInitializer.java  |  10 +-
 .../scaling/mysql/binlog/BinlogPositionTest.java   |   6 +-
 .../scaling/postgresql/PostgreSQLScalingEntry.java |   2 +-
 .../component/PostgreSQLPositionInitializer.java   |   7 +-
 .../scaling/postgresql/wal/WalPosition.java        |  11 +-
 .../scaling/postgresql/wal/WalPositionTest.java    |  15 +--
 26 files changed, 363 insertions(+), 276 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index 06dbbd5..4852f33 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -46,12 +46,12 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
     
     @Override
     public void persistJobProgress(final JobContext jobContext) {
-        JobProgress jobPosition = new JobProgress();
-        jobPosition.setStatus(jobContext.getStatus());
-        jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
-        jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
-        jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
-        registryRepository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
+        JobProgress jobProgress = new JobProgress();
+        jobProgress.setStatus(jobContext.getStatus());
+        jobProgress.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
+        jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
+        jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
+        registryRepository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), jobProgress.toString());
     }
     
     private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -78,7 +78,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
         } catch (final NullPointerException ex) {
             log.info("job {}-{} without break point.", jobId, shardingItem);
         }
-        return Strings.isNullOrEmpty(data) ? null : JobProgress.fromJson(data);
+        return Strings.isNullOrEmpty(data) ? null : JobProgress.init(data);
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
index 67abb5a..f79c5e1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
@@ -17,17 +17,9 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import com.google.gson.TypeAdapter;
-import com.google.gson.annotations.JsonAdapter;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
-
-import java.io.IOException;
-
 /**
  * Placeholder position.
  */
-@JsonAdapter(PlaceholderPosition.PositionTypeAdapter.class)
 public final class PlaceholderPosition implements Position<PlaceholderPosition> {
     
     @Override
@@ -35,22 +27,8 @@ public final class PlaceholderPosition implements Position<PlaceholderPosition>
         return 1;
     }
     
-    /**
-     * Position type adapter.
-     */
-    public static class PositionTypeAdapter extends TypeAdapter<PlaceholderPosition> {
-        
-        @Override
-        public void write(final JsonWriter out, final PlaceholderPosition value) throws IOException {
-            out.beginArray();
-            out.endArray();
-        }
-        
-        @Override
-        public PlaceholderPosition read(final JsonReader in) throws IOException {
-            in.beginArray();
-            in.endArray();
-            return new PlaceholderPosition();
-        }
+    @Override
+    public String toString() {
+        return "";
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
index 4727d32..ab3bfbb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
@@ -23,7 +23,7 @@ import java.sql.SQLException;
 /**
  * Position initializer.
  */
-public interface PositionInitializer<T extends Position<?>> {
+public interface PositionInitializer {
     
     /**
      * Init position by data source.
@@ -32,5 +32,13 @@ public interface PositionInitializer<T extends Position<?>> {
      * @return position
      * @throws SQLException SQL exception
      */
-    T init(DataSource dataSource) throws SQLException;
+    Position<?> init(DataSource dataSource) throws SQLException;
+    
+    /**
+     * Init position by string data.
+     *
+     * @param data string data
+     * @return position
+     */
+    Position<?> init(String data);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
index c6d7617..0bbf66f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
@@ -21,7 +21,6 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
 
 /**
  * Position initializer factory.
@@ -36,17 +35,7 @@ public final class PositionInitializerFactory {
      * @return position initializer
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static PositionInitializer<?> newInstance(final String databaseType) {
+    public static PositionInitializer newInstance(final String databaseType) {
         return ScalingEntryLoader.getInstance(databaseType).getPositionInitializer().newInstance();
     }
-    
-    /**
-     * Get position type.
-     *
-     * @param databaseType database type
-     * @return position type
-     */
-    public static Class<?> getPositionClass(final String databaseType) {
-        return ReflectionUtil.getInterfaceGenericClass(ScalingEntryLoader.getInstance(databaseType).getPositionInitializer());
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
index 39e1d95..f7325aa 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -17,27 +17,33 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import com.google.gson.TypeAdapter;
-import com.google.gson.annotations.JsonAdapter;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
+import com.google.common.base.Preconditions;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
-import java.io.IOException;
-
 /**
  * Use primary key as position.
  */
 @RequiredArgsConstructor
 @Getter
-@JsonAdapter(PrimaryKeyPosition.PositionTypeAdapter.class)
 public final class PrimaryKeyPosition implements Position<PrimaryKeyPosition> {
     
     private final long beginValue;
     
     private final long endValue;
     
+    /**
+     * Init by string data.
+     *
+     * @param data string data
+     * @return primary key position
+     */
+    public static PrimaryKeyPosition init(final String data) {
+        String[] array = data.split(",");
+        Preconditions.checkArgument(array.length == 2, "Unknown primary key position: " + data);
+        return new PrimaryKeyPosition(Long.parseLong(array[0]), Long.parseLong(array[1]));
+    }
+    
     @Override
     public int compareTo(final PrimaryKeyPosition position) {
         if (null == position) {
@@ -46,25 +52,8 @@ public final class PrimaryKeyPosition implements Position<PrimaryKeyPosition> {
         return Long.compare(beginValue, position.beginValue);
     }
     
-    /**
-     * Position type adapter.
-     */
-    public static class PositionTypeAdapter extends TypeAdapter<PrimaryKeyPosition> {
-        
-        @Override
-        public void write(final JsonWriter out, final PrimaryKeyPosition value) throws IOException {
-            out.beginArray();
-            out.value(value.getBeginValue());
-            out.value(value.getEndValue());
-            out.endArray();
-        }
-        
-        @Override
-        public PrimaryKeyPosition read(final JsonReader in) throws IOException {
-            in.beginArray();
-            PrimaryKeyPosition result = new PrimaryKeyPosition(in.nextLong(), in.nextLong());
-            in.endArray();
-            return result;
-        }
+    @Override
+    public String toString() {
+        return String.format("%d,%d", beginValue, endValue);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 11ca2c8..d258161 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -61,7 +61,9 @@ public final class ScalingJobPreparer {
     
     private void checkDataSource(final JobContext jobContext, final DataSourceManager dataSourceManager) {
         checkSourceDataSources(jobContext, dataSourceManager);
-        checkTargetDataSources(jobContext, dataSourceManager);
+        if (null == jobContext.getInitProgress()) {
+            checkTargetDataSources(jobContext, dataSourceManager);
+        }
     }
     
     private void checkSourceDataSources(final JobContext jobContext, final DataSourceManager dataSourceManager) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
index 776e73f..32eabbe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
@@ -17,28 +17,16 @@
 
 package org.apache.shardingsphere.scaling.core.job.progress;
 
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.TypeAdapter;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.scaling.core.job.JobStatus;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
 import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskDelay;
+import org.apache.shardingsphere.scaling.core.job.progress.yaml.JobProgressYamlSwapper;
+import org.apache.shardingsphere.scaling.core.job.progress.yaml.YamlJobProgress;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
@@ -51,9 +39,7 @@ import java.util.stream.Collectors;
 @Setter
 public final class JobProgress {
     
-    private static final Gson GSON = new Gson();
-    
-    private static final Gson INVENTORY_POSITION_ADAPTED_GSON = new GsonBuilder().registerTypeHierarchyAdapter(Position.class, new InventoryPositionTypeAdapter()).create();
+    private static final JobProgressYamlSwapper JOB_PROGRESS_YAML_SWAPPER = new JobProgressYamlSwapper();
     
     private JobStatus status = JobStatus.RUNNING;
     
@@ -63,6 +49,17 @@ public final class JobProgress {
     
     private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
     
+    
+    /**
+     * Init by string data.
+     *
+     * @param data string data
+     * @return job progress
+     */
+    public static JobProgress init(final String data) {
+        return JOB_PROGRESS_YAML_SWAPPER.swapToObject(YamlEngine.unmarshal(data, YamlJobProgress.class));
+    }
+    
     /**
      * Get incremental position.
      *
@@ -86,106 +83,8 @@ public final class JobProgress {
                 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition()));
     }
     
-    /**
-     * To json.
-     *
-     * @return json data
-     */
-    public String toJson() {
-        JsonObject result = new JsonObject();
-        result.addProperty("status", status.name());
-        result.addProperty("databaseType", databaseType);
-        result.add("inventory", getInventoryJson());
-        result.add("incremental", getIncrementalJson());
-        return result.toString();
-    }
-    
-    private JsonElement getInventoryJson() {
-        JsonObject result = new JsonObject();
-        result.add("finished", getInventoryFinishedJson());
-        result.add("unfinished", getInventoryUnfinishedJson());
-        return result;
-    }
-    
-    private JsonElement getInventoryFinishedJson() {
-        JsonArray result = new JsonArray();
-        inventoryTaskProgressMap.entrySet().stream()
-                .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
-                .forEach(entry -> result.add(entry.getKey()));
-        return result;
-    }
-    
-    private JsonElement getInventoryUnfinishedJson() {
-        JsonObject result = new JsonObject();
-        inventoryTaskProgressMap.entrySet().stream()
-                .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
-                .forEach(entry -> result.add(entry.getKey(), GSON.toJsonTree(entry.getValue(), entry.getValue().getClass())));
-        return result;
-    }
-    
-    private JsonElement getIncrementalJson() {
-        JsonObject result = new JsonObject();
-        incrementalTaskProgressMap.forEach((key, value) -> result.add(key, getIncrementalJson(value)));
-        return result;
-    }
-    
-    private JsonElement getIncrementalJson(final IncrementalTaskProgress incrementalTaskProgress) {
-        JsonObject result = new JsonObject();
-        result.add("position", GSON.toJsonTree(incrementalTaskProgress.getPosition(), incrementalTaskProgress.getPosition().getClass()));
-        result.add("delay", GSON.toJsonTree(incrementalTaskProgress.getIncrementalTaskDelay()));
-        return result;
-    }
-    
-    /**
-     * From json.
-     *
-     * @param data json data
-     * @return job position
-     */
-    public static JobProgress fromJson(final String data) {
-        JobProgress result = new JobProgress();
-        JsonObject jsonObject = GSON.fromJson(data, JsonObject.class);
-        result.setStatus(JobStatus.valueOf(jsonObject.get("status").getAsString()));
-        result.setDatabaseType(jsonObject.get("databaseType").getAsString());
-        result.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jsonObject.get("inventory").getAsJsonObject()));
-        result.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jsonObject.get("incremental").getAsJsonObject(), jsonObject.get("databaseType").getAsString()));
-        return result;
-    }
-    
-    private static Map<String, InventoryTaskProgress> getInventoryTaskProgressMap(final JsonObject inventory) {
-        JsonObject jsonObject = new JsonObject();
-        jsonObject.add("inventory", inventory);
-        return INVENTORY_POSITION_ADAPTED_GSON.fromJson(jsonObject, JobProgress.class).inventoryTaskProgressMap;
-    }
-    
-    private static Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JsonObject incremental, final String databaseType) {
-        Class<?> incrementalPositionClass = PositionInitializerFactory.getPositionClass(databaseType);
-        Map<String, IncrementalTaskProgress> result = Maps.newHashMap();
-        for (String each : incremental.keySet()) {
-            Position<?> position = (Position<?>) GSON.fromJson(incremental.get(each).getAsJsonObject().get("position"), incrementalPositionClass);
-            IncrementalTaskDelay incrementalTaskDelay = GSON.fromJson(incremental.get(each).getAsJsonObject().get("delay"), IncrementalTaskDelay.class);
-            result.put(each, new IncrementalTaskProgress(position, incrementalTaskDelay));
-        }
-        return result;
-    }
-    
-    private static class InventoryPositionTypeAdapter extends TypeAdapter<Position<?>> {
-        
-        @Override
-        public void write(final JsonWriter out, final Position<?> value) throws IOException {
-            if (value instanceof PrimaryKeyPosition) {
-                new PrimaryKeyPosition.PositionTypeAdapter().write(out, (PrimaryKeyPosition) value);
-            } else if (value instanceof PlaceholderPosition) {
-                new PlaceholderPosition.PositionTypeAdapter().write(out, (PlaceholderPosition) value);
-            }
-        }
-        
-        @Override
-        public Position<?> read(final JsonReader in) throws IOException {
-            in.beginArray();
-            Position<?> result = in.hasNext() ? new PrimaryKeyPosition(in.nextLong(), in.nextLong()) : new PlaceholderPosition();
-            in.endArray();
-            return result;
-        }
+    @Override
+    public String toString() {
+        return YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(this));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/JobProgressYamlSwapper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/JobProgressYamlSwapper.java
new file mode 100644
index 0000000..7996273
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/JobProgressYamlSwapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.progress.yaml;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ *Job progress yaml swapper.
+ */
+public final class JobProgressYamlSwapper {
+    
+    /**
+     * Swap to yaml.
+     *
+     * @param jobProgress job progress
+     * @return yaml job progress
+     */
+    public YamlJobProgress swapToYaml(final JobProgress jobProgress) {
+        YamlJobProgress result = new YamlJobProgress();
+        result.setStatus(jobProgress.getStatus().name());
+        result.setDatabaseType(jobProgress.getDatabaseType());
+        result.setInventory(getYamlInventory(jobProgress.getInventoryTaskProgressMap()));
+        result.setIncremental(getYamlIncremental(jobProgress.getIncrementalTaskProgressMap()));
+        return result;
+    }
+    
+    private YamlJobProgress.YamlInventory getYamlInventory(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) {
+        YamlJobProgress.YamlInventory result = new YamlJobProgress.YamlInventory();
+        result.setFinished(getFinished(inventoryTaskProgressMap));
+        result.setUnfinished(getUnfinished(inventoryTaskProgressMap));
+        return result;
+    }
+    
+    private String[] getFinished(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) {
+        return inventoryTaskProgressMap.entrySet().stream()
+                .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
+                .map(Entry::getKey)
+                .toArray(String[]::new);
+    }
+    
+    private Map<String, String> getUnfinished(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) {
+        return inventoryTaskProgressMap.entrySet().stream()
+                .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
+                .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition().toString()));
+    }
+    
+    private Map<String, YamlJobProgress.YamlIncremental> getYamlIncremental(final Map<String, IncrementalTaskProgress> incrementalTaskProgressMap) {
+        return incrementalTaskProgressMap.entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, entry -> {
+                    YamlJobProgress.YamlIncremental yamlIncremental = new YamlJobProgress.YamlIncremental();
+                    yamlIncremental.setPosition(entry.getValue().getPosition().toString());
+                    yamlIncremental.setDelay(entry.getValue().getIncrementalTaskDelay());
+                    return yamlIncremental;
+                }));
+    }
+    
+    /**
+     * Swap to object.
+     *
+     * @param yamlJobProgress yaml job progress
+     * @return job progress
+     */
+    public JobProgress swapToObject(final YamlJobProgress yamlJobProgress) {
+        JobProgress result = new JobProgress();
+        result.setStatus(JobStatus.valueOf(yamlJobProgress.getStatus()));
+        result.setDatabaseType(yamlJobProgress.getDatabaseType());
+        result.setInventoryTaskProgressMap(getInventoryTaskProgressMap(yamlJobProgress.getInventory()));
+        result.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(yamlJobProgress.getDatabaseType(), yamlJobProgress.getIncremental()));
+        return result;
+    }
+    
+    private Map<String, InventoryTaskProgress> getInventoryTaskProgressMap(final YamlJobProgress.YamlInventory inventory) {
+        Map<String, InventoryTaskProgress> result = Maps.newHashMap();
+        result.putAll(Arrays.stream(inventory.getFinished())
+                .collect(Collectors.toMap(each -> each, each -> new InventoryTaskProgress(new FinishedPosition()))));
+        result.putAll(inventory.getUnfinished().entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, getInventoryTaskProgressFunction())));
+        return result;
+    }
+    
+    private Function<Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() {
+        return entry -> new InventoryTaskProgress(Strings.isNullOrEmpty(entry.getValue()) ? new PlaceholderPosition() : PrimaryKeyPosition.init(entry.getValue()));
+    }
+    
+    private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final String databaseType, final Map<String, YamlJobProgress.YamlIncremental> incremental) {
+        return incremental.entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, getIncrementalTaskProgressFunction(databaseType)));
+    }
+    
+    private Function<Entry<String, YamlJobProgress.YamlIncremental>, IncrementalTaskProgress> getIncrementalTaskProgressFunction(final String databaseType) {
+        return entry -> new IncrementalTaskProgress(PositionInitializerFactory.newInstance(databaseType).init(entry.getValue().getPosition()), entry.getValue().getDelay());
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/YamlJobProgress.java
similarity index 54%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/YamlJobProgress.java
index 087c8db..c27d949 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/YamlJobProgress.java
@@ -15,33 +15,44 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.wal;
+package org.apache.shardingsphere.scaling.core.job.progress.yaml;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.postgresql.replication.LogSequenceNumber;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskDelay;
+
+import java.util.Map;
 
 /**
- * PostgreSQL wal position.
+ * Yaml job progress.
  */
-@RequiredArgsConstructor
-@AllArgsConstructor
 @Getter
 @Setter
-public final class WalPosition implements Position<WalPosition> {
+public final class YamlJobProgress {
+    
+    private String status;
+    
+    private String databaseType;
     
-    private final LogSequenceNumber logSequenceNumber;
+    private YamlInventory inventory;
     
-    private long delay;
+    private Map<String, YamlIncremental> incremental;
+    
+    @Getter
+    @Setter
+    public static final class YamlInventory {
+        
+        private String[] finished;
+        
+        private Map<String, String> unfinished;
+    }
     
-    @Override
-    public int compareTo(final WalPosition position) {
-        if (null == position) {
-            return 1;
-        }
-        return Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
+    @Getter
+    @Setter
+    public static final class YamlIncremental {
+        
+        private String position;
+        
+        private IncrementalTaskDelay delay;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index d419ec2..011542e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -50,7 +50,7 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
      *
      * @return position initializer type
      */
-    Class<? extends PositionInitializer<?>> getPositionInitializer();
+    Class<? extends PositionInitializer> getPositionInitializer();
     
     /**
      * Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
index 742e5f4..61e12a0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
@@ -17,15 +17,12 @@
 
 package org.apache.shardingsphere.scaling.core.util;
 
-import com.google.common.base.Preconditions;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 
 /**
  * Reflection utils.
@@ -108,16 +105,4 @@ public final class ReflectionUtil {
         method.setAccessible(true);
         method.invoke(target, parameterValues);
     }
-    
-    /**
-     * Get interface generic class.
-     *
-     * @param clazz class
-     * @return generic class
-     */
-    public static Class<?> getInterfaceGenericClass(final Class<?> clazz) {
-        Type[] types = clazz.getGenericInterfaces();
-        Preconditions.checkState(types.length == 1, "Only supported one generic type");
-        return (Class<?>) ((ParameterizedType) types[0]).getActualTypeArguments()[0];
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 2a92275..8ab5162 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -39,7 +39,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionInitializer<?>> getPositionInitializer() {
+    public Class<? extends PositionInitializer> getPositionInitializer() {
         return FixturePositionInitializer.class;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
index ac58683..074c5a1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
@@ -22,10 +22,15 @@ import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 
 import javax.sql.DataSource;
 
-public final class FixturePositionInitializer implements PositionInitializer<PlaceholderPosition> {
+public final class FixturePositionInitializer implements PositionInitializer {
     
     @Override
     public PlaceholderPosition init(final DataSource dataSource) {
         return new PlaceholderPosition();
     }
+    
+    @Override
+    public PlaceholderPosition init(final String data) {
+        return new PlaceholderPosition();
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java
index 5dc9f09..e57c5bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import com.google.gson.Gson;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -25,8 +24,6 @@ import static org.junit.Assert.assertThat;
 
 public final class PlaceholderPositionTest {
     
-    public static final Gson GSON = new Gson();
-    
     @Test
     public void assertCompareTo() {
         PlaceholderPosition position1 = new PlaceholderPosition();
@@ -35,8 +32,7 @@ public final class PlaceholderPositionTest {
     }
     
     @Test
-    public void assertTypeAdapter() {
-        PlaceholderPosition position = GSON.fromJson("[]", PlaceholderPosition.class);
-        assertThat(GSON.toJson(position), is("[]"));
+    public void assertToString() {
+        assertThat(new PlaceholderPosition().toString(), is(""));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java
index ae1e0ab..2fbc79e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.job.position;
 
-import com.google.gson.Gson;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -35,15 +34,14 @@ public final class PrimaryKeyPositionTest {
     }
     
     @Test
-    public void assertFormJson() {
-        PrimaryKeyPosition position = new Gson().fromJson("[1,100]", PrimaryKeyPosition.class);
+    public void assertInit() {
+        PrimaryKeyPosition position = PrimaryKeyPosition.init("1,100");
         assertThat(position.getBeginValue(), is(1L));
         assertThat(position.getEndValue(), is(100L));
     }
     
     @Test
-    public void assertToJson() {
-        PrimaryKeyPosition position = new PrimaryKeyPosition(1, 100);
-        assertThat(new Gson().toJson(position), is("[1,100]"));
+    public void assertToString() {
+        assertThat(new PrimaryKeyPosition(1, 100).toString(), is("1,100"));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgressTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgressTest.java
new file mode 100644
index 0000000..2a8facc
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgressTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.progress;
+
+import com.google.common.collect.Maps;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class JobProgressTest {
+    
+    @Test
+    public void assertInit() {
+        JobProgress jobProgress = JobProgress.init(mockJobProgressYamlString());
+        assertThat(jobProgress.getStatus(), is(JobStatus.RUNNING));
+        assertThat(jobProgress.getDatabaseType(), is("H2"));
+        assertThat(jobProgress.getInventoryTaskProgressMap().size(), is(4));
+        assertThat(jobProgress.getIncrementalTaskProgressMap().size(), is(1));
+    }
+    
+    @Test
+    public void assertGetIncrementalPosition() {
+        JobProgress jobProgress = JobProgress.init(mockJobProgressYamlString());
+        assertTrue(jobProgress.getIncrementalPosition("ds0") instanceof PlaceholderPosition);
+    }
+    
+    @Test
+    public void assertGetInventoryPosition() {
+        JobProgress jobProgress = JobProgress.init(mockJobProgressYamlString());
+        assertThat(jobProgress.getInventoryPosition("ds0").size(), is(2));
+        assertTrue(jobProgress.getInventoryPosition("ds0").get("ds0.t_1") instanceof FinishedPosition);
+        assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_1") instanceof PlaceholderPosition);
+        assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_2") instanceof PrimaryKeyPosition);
+    }
+    
+    @Test
+    public void assertToString() {
+        JobProgress jobProgress = new JobProgress();
+        jobProgress.setStatus(JobStatus.RUNNING);
+        jobProgress.setDatabaseType("H2");
+        jobProgress.setIncrementalTaskProgressMap(mockIncrementalTaskProgressMap());
+        jobProgress.setInventoryTaskProgressMap(mockInventoryTaskProgressMap());
+        assertThat(jobProgress.toString(), is(mockJobProgressYamlString()));
+    }
+    
+    private Map<String, IncrementalTaskProgress> mockIncrementalTaskProgressMap() {
+        Map<String, IncrementalTaskProgress> result = Maps.newHashMap();
+        result.put("ds0", new IncrementalTaskProgress(new PlaceholderPosition()));
+        return result;
+    }
+    
+    private Map<String, InventoryTaskProgress> mockInventoryTaskProgressMap() {
+        Map<String, InventoryTaskProgress> result = Maps.newHashMap();
+        result.put("ds0.t_1", new InventoryTaskProgress(new FinishedPosition()));
+        result.put("ds0.t_2", new InventoryTaskProgress(new FinishedPosition()));
+        result.put("ds1.t_1", new InventoryTaskProgress(new PlaceholderPosition()));
+        result.put("ds1.t_2", new InventoryTaskProgress(new PrimaryKeyPosition(1, 2)));
+        return result;
+    }
+    
+    private String mockJobProgressYamlString() {
+        return "databaseType: H2\n"
+                + "incremental:\n"
+                + "  ds0:\n"
+                + "    delay:\n"
+                + "      delayMilliseconds: -1\n"
+                + "      lastEventTimestamps: 0\n"
+                + "    position: ''\n"
+                + "inventory:\n"
+                + "  finished:\n"
+                + "  - ds0.t_2\n"
+                + "  - ds0.t_1\n"
+                + "  unfinished:\n"
+                + "    ds1.t_2: 1,2\n"
+                + "    ds1.t_1: ''\n"
+                + "status: RUNNING\n";
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml
index be2ff44..f391881 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml
@@ -26,8 +26,9 @@
         <appender-ref ref="console" />
     </logger>
     <logger name="com.zaxxer.hikari" level="warn" />
+    <logger name="org.apache.zookeeper.ClientCnxn" level="off" />
     <logger name="org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter" level="off" />
-    
+
     <root>
         <level value="info" />
         <appender-ref ref="console" />
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index b303d17..85ca7d3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -17,10 +17,10 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
+import org.apache.shardingsphere.scaling.core.common.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.executor.dumper.JDBCDumper;
 import org.apache.shardingsphere.scaling.core.executor.dumper.LogDumper;
 import org.apache.shardingsphere.scaling.core.executor.importer.Importer;
-import org.apache.shardingsphere.scaling.core.common.sqlbuilder.ScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -49,7 +49,7 @@ public final class MySQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionInitializer<?>> getPositionInitializer() {
+    public Class<? extends PositionInitializer> getPositionInitializer() {
         return MySQLPositionInitializer.class;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
index a069f66..edc4c48 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
@@ -36,9 +36,7 @@ public final class BinlogPosition implements Position<BinlogPosition> {
     
     private final long position;
     
-    private transient long serverId;
-    
-    private long delay;
+    private long serverId;
     
     @Override
     public int compareTo(final BinlogPosition position) {
@@ -51,4 +49,9 @@ public final class BinlogPosition implements Position<BinlogPosition> {
     private long toLong() {
         return Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)) << 32 | position;
     }
+    
+    @Override
+    public String toString() {
+        return String.format("%s#%d", filename, position);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java
index 1bcc5d7..d298735 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java
@@ -159,16 +159,14 @@ public final class MySQLBinlogDumper extends AbstractScalingExecutor implements
     }
     
     private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
-        long delay = System.currentTimeMillis() - rowsEvent.getTimestamp() * 1000;
-        DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId(), delay), columnCount);
+        DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
         result.setTableName(dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
         result.setCommitTime(rowsEvent.getTimestamp() * 1000);
         return result;
     }
     
     private void createPlaceholderRecord(final AbstractBinlogEvent event) {
-        long delay = System.currentTimeMillis() - event.getTimestamp() * 1000;
-        PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId(), delay));
+        PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
         record.setCommitTime(event.getTimestamp() * 1000);
         pushRecord(record);
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
index b4eae3e..d5ee6e0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.mysql.component;
 
+import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
 
@@ -29,7 +30,7 @@ import java.sql.SQLException;
 /**
  * MySQL binlog position initializer.
  */
-public final class MySQLPositionInitializer implements PositionInitializer<BinlogPosition> {
+public final class MySQLPositionInitializer implements PositionInitializer {
     
     @Override
     public BinlogPosition init(final DataSource dataSource) throws SQLException {
@@ -40,6 +41,13 @@ public final class MySQLPositionInitializer implements PositionInitializer<Binlo
         }
     }
     
+    @Override
+    public BinlogPosition init(final String data) {
+        String[] array = data.split("#");
+        Preconditions.checkArgument(array.length == 2, "Unknown binlog position: " + data);
+        return new BinlogPosition(array[0], Long.parseLong(array[1]));
+    }
+    
     private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException {
         try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
              ResultSet resultSet = preparedStatement.executeQuery()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java
index 75aedce..7b3d68c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.mysql.binlog;
 
-import com.google.gson.Gson;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -41,8 +40,7 @@ public final class BinlogPositionTest {
     }
     
     @Test
-    public void assertToJson() {
-        BinlogPosition binlogPosition = new BinlogPosition("mysql-bin.000001", 4);
-        assertThat(new Gson().toJson(binlogPosition), is("{\"filename\":\"mysql-bin.000001\",\"position\":4,\"delay\":0}"));
+    public void assertToString() {
+        assertThat(new BinlogPosition("mysql-bin.000001", 4).toString(), is("mysql-bin.000001#4"));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index fc506ed..3b9d862 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -49,7 +49,7 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PositionInitializer<?>> getPositionInitializer() {
+    public Class<? extends PositionInitializer> getPositionInitializer() {
         return PostgreSQLPositionInitializer.class;
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index 11e84db..457bdc3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -31,7 +31,7 @@ import java.sql.SQLException;
 /**
  * PostgreSQL wal position initializer.
  */
-public final class PostgreSQLPositionInitializer implements PositionInitializer<WalPosition> {
+public final class PostgreSQLPositionInitializer implements PositionInitializer {
     
     public static final String SLOT_NAME = "sharding_scaling";
     
@@ -47,6 +47,11 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer<
         }
     }
     
+    @Override
+    public WalPosition init(final String data) {
+        return new WalPosition(LogSequenceNumber.valueOf(Long.parseLong(data)));
+    }
+    
     private void createIfNotExists(final Connection connection) throws SQLException {
         try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN))) {
             ps.execute();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index 087c8db..07ef76c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -17,10 +17,8 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.job.position.Position;
 import org.postgresql.replication.LogSequenceNumber;
 
@@ -28,15 +26,11 @@ import org.postgresql.replication.LogSequenceNumber;
  * PostgreSQL wal position.
  */
 @RequiredArgsConstructor
-@AllArgsConstructor
 @Getter
-@Setter
 public final class WalPosition implements Position<WalPosition> {
     
     private final LogSequenceNumber logSequenceNumber;
     
-    private long delay;
-    
     @Override
     public int compareTo(final WalPosition position) {
         if (null == position) {
@@ -44,4 +38,9 @@ public final class WalPosition implements Position<WalPosition> {
         }
         return Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
     }
+    
+    @Override
+    public String toString() {
+        return String.valueOf(logSequenceNumber.asLong());
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
index 967622f..9b2c590 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal;
 
-import com.google.gson.Gson;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
@@ -26,8 +25,6 @@ import static org.junit.Assert.assertThat;
 
 public final class WalPositionTest {
     
-    private static final Gson GSON = new Gson();
-    
     @Test
     public void assertCompareTo() {
         WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
@@ -36,15 +33,7 @@ public final class WalPositionTest {
     }
     
     @Test
-    public void assertToJson() {
-        WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
-        assertThat(GSON.toJson(walPosition), is("{\"logSequenceNumber\":{\"value\":100},\"delay\":0}"));
-    }
-    
-    @Test
-    public void assertFromJson() {
-        WalPosition walPosition = GSON.fromJson("{\"logSequenceNumber\":{\"value\":100},\"delay\":0}", WalPosition.class);
-        assertThat(walPosition.getLogSequenceNumber().asLong(), is(100L));
-        assertThat(walPosition.getDelay(), is(0L));
+    public void assertToString() {
+        assertThat(new WalPosition(LogSequenceNumber.valueOf(100L)).toString(), is("100"));
     }
 }