You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/14 10:39:04 UTC

[shardingsphere] branch master updated: add scaling adapt feature for openGauss (#12318)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8272069  add scaling adapt feature for openGauss (#12318)
8272069 is described below

commit 82720698b11be7006e2bda800980e4c75ef083b2
Author: justbk2015 <24...@qq.com>
AuthorDate: Tue Sep 14 18:38:29 2021 +0800

    add scaling adapt feature for openGauss (#12318)
    
    Co-authored-by: justbk2015 <24...@qq.com>
---
 pom.xml                                            |   9 +-
 .../shardingsphere-proxy-bootstrap/pom.xml         |   5 +
 .../shardingsphere-scaling-bootstrap/pom.xml       |   6 +
 .../shardingsphere-scaling-dialect/pom.xml         |   1 +
 .../{ => shardingsphere-scaling-opengauss}/pom.xml |  32 +++--
 .../scaling/opengauss/OpenGaussScalingEntry.java   |  67 ++++++++++
 .../opengauss/component/OpenGaussImporter.java     |  42 +++++++
 .../component/OpenGaussPositionInitializer.java    |  68 +++++++++++
 .../component/OpenGaussScalingSQLBuilder.java      |  54 +++++++++
 .../opengauss/component/OpenGaussWalDumper.java}   |  66 ++++++----
 .../opengauss/wal/OpenGaussLogicalReplication.java | 135 +++++++++++++++++++++
 .../wal/decode/OpenGaussLogSequenceNumber.java}    |  33 ++---
 .../wal/decode/OpenGaussTimestampUtils.java}       |  38 +++---
 ...he.shardingsphere.scaling.core.spi.ScalingEntry |  18 +++
 .../opengauss/OpenGaussScalingEntryTest.java       |  49 ++++++++
 .../src/test/resources/logback-test.xml}           |  36 +++---
 .../component/PostgreSQLPositionInitializer.java   |   7 +-
 .../postgresql/component/PostgreSQLWalDumper.java  |   7 +-
 .../scaling/postgresql/wal/LogicalReplication.java |   5 +-
 .../scaling/postgresql/wal/WalPosition.java        |   4 +-
 .../BaseLogSequenceNumber.java}                    |  28 +++--
 ...DecodingPlugin.java => BaseTimestampUtils.java} |  31 +++--
 .../postgresql/wal/decode/DecodingPlugin.java      |   3 +-
 ...lugin.java => PostgreSQLLogSequenceNumber.java} |  28 +++--
 ...ngPlugin.java => PostgreSQLTimestampUtils.java} |  33 +++--
 .../postgresql/wal/decode/TestDecodingPlugin.java  |   6 +-
 .../postgresql/wal/event/AbstractWalEvent.java     |   4 +-
 .../component/PostgreSQLImporterTest.java          |   3 +-
 .../PostgreSQLPositionInitializerTest.java         |   4 +-
 .../component/PostgreSQLScalingSQLBuilderTest.java |   3 +-
 .../component/PostgreSQLWalDumperTest.java         |   3 +-
 .../postgresql/wal/LogicalReplicationTest.java     |   7 +-
 .../scaling/postgresql/wal/WalPositionTest.java    |   7 +-
 .../wal/decode/TestDecodingPluginTest.java         |   8 +-
 34 files changed, 688 insertions(+), 162 deletions(-)

diff --git a/pom.xml b/pom.xml
index f0dcbcc..b2acc0c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,7 @@
         <h2.version>1.4.196</h2.version>
         <mysql-connector-java.version>5.1.47</mysql-connector-java.version>
         <postgresql.version>42.2.5</postgresql.version>
+        <opengauss.version>2.0.1-compatibility</opengauss.version>
         <mssql.version>6.1.7.jre8-preview</mssql.version>
         <mariadb-java-client.version>2.4.2</mariadb-java-client.version>
         <bytebuddy.version>1.10.16</bytebuddy.version>
@@ -139,7 +140,7 @@
 
         <maven.deploy.skip>false</maven.deploy.skip>
     </properties>
-    
+
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -364,6 +365,12 @@
                 <scope>provided</scope>
             </dependency>
             <dependency>
+                <groupId>org.opengauss</groupId>
+                <artifactId>opengauss-jdbc</artifactId>
+                <version>${opengauss.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
                 <groupId>com.microsoft.sqlserver</groupId>
                 <artifactId>mssql-jdbc</artifactId>
                 <version>${mssql.version}</version>
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
index 2ddd0fa..0a63317 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/pom.xml
@@ -150,6 +150,11 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-opengauss</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP</artifactId>
             <scope>compile</scope>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
index 2d5e289..23819e6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
@@ -44,6 +44,12 @@
             <artifactId>shardingsphere-scaling-postgresql</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-opengauss</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         
         <dependency>
             <groupId>org.yaml</groupId>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
index 6125da7..84ff07c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
@@ -32,5 +32,6 @@
     <modules>
         <module>shardingsphere-scaling-mysql</module>
         <module>shardingsphere-scaling-postgresql</module>
+        <module>shardingsphere-scaling-opengauss</module>
     </modules>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/pom.xml
similarity index 55%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/pom.xml
index 6125da7..4c60b51 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/pom.xml
@@ -17,20 +17,32 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.shardingsphere</groupId>
-        <artifactId>shardingsphere-scaling</artifactId>
+        <artifactId>shardingsphere-scaling-dialect</artifactId>
         <version>5.0.0-RC1-SNAPSHOT</version>
     </parent>
-    <artifactId>shardingsphere-scaling-dialect</artifactId>
+    <artifactId>shardingsphere-scaling-opengauss</artifactId>
     <name>${project.artifactId}</name>
-    <packaging>pom</packaging>
-
-    <modules>
-        <module>shardingsphere-scaling-mysql</module>
-        <module>shardingsphere-scaling-postgresql</module>
-    </modules>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-scaling-postgresql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opengauss</groupId>
+            <artifactId>opengauss-jdbc</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
new file mode 100644
index 0000000..07f54b2
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss;
+
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussImporter;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussPositionInitializer;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussScalingSQLBuilder;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussWalDumper;
+import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLInventoryDumper;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLEnvironmentChecker;
+
+/**
+ * OpenGauss scaling entry.
+ */
+public final class OpenGaussScalingEntry implements ScalingEntry {
+    
+    @Override
+    public Class<PostgreSQLInventoryDumper> getInventoryDumperClass() {
+        return PostgreSQLInventoryDumper.class;
+    }
+    
+    @Override
+    public Class<OpenGaussWalDumper> getIncrementalDumperClass() {
+        return OpenGaussWalDumper.class;
+    }
+    
+    @Override
+    public Class<OpenGaussPositionInitializer> getPositionInitializerClass() {
+        return OpenGaussPositionInitializer.class;
+    }
+    
+    @Override
+    public Class<OpenGaussImporter> getImporterClass() {
+        return OpenGaussImporter.class;
+    }
+    
+    @Override
+    public Class<PostgreSQLEnvironmentChecker> getEnvironmentCheckerClass() {
+        return PostgreSQLEnvironmentChecker.class;
+    }
+    
+    @Override
+    public Class<OpenGaussScalingSQLBuilder> getSQLBuilderClass() {
+        return OpenGaussScalingSQLBuilder.class;
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "openGauss";
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussImporter.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussImporter.java
new file mode 100644
index 0000000..7b39f35
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussImporter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component;
+
+import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.common.sqlbuilder.ScalingSQLBuilder;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.executor.importer.AbstractImporter;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * OpenGauss importer.
+ */
+public final class OpenGaussImporter extends AbstractImporter {
+
+    public OpenGaussImporter(final ImporterConfiguration importerConfig, final DataSourceManager dataSourceManager) {
+        super(importerConfig, dataSourceManager);
+    }
+    
+    @Override
+    protected ScalingSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+        return new OpenGaussScalingSQLBuilder(shardingColumnsMap);
+    }
+}
+
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java
new file mode 100644
index 0000000..9245429
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component;
+
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
+import org.apache.shardingsphere.scaling.opengauss.wal.OpenGaussLogicalReplication;
+import org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussLogSequenceNumber;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.opengauss.replication.LogSequenceNumber;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * OpenGauss wal position initializer.
+ */
+public final class OpenGaussPositionInitializer implements PositionInitializer {
+    
+    @Override
+    public WalPosition init(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            return getWalPosition(connection);
+        }
+    }
+    
+    @Override
+    public WalPosition init(final String data) {
+        return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data)))
+        );
+    }
+
+    private WalPosition getWalPosition(final Connection connection) throws SQLException {
+        try (PreparedStatement ps = connection.prepareStatement(getSql());
+             ResultSet rs = ps.executeQuery()) {
+            rs.next();
+            return new WalPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(rs.getString(1))));
+        }
+    }
+    
+    private String getSql() {
+        return "SELECT PG_CURRENT_XLOG_LOCATION()";
+    }
+    
+    @Override
+    public void destroy(final DataSource dataSource) throws SQLException {
+        try (Connection conn = dataSource.getConnection()) {
+            OpenGaussLogicalReplication.dropSlot(conn);
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
new file mode 100644
index 0000000..23086a2
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component;
+
+import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.common.sqlbuilder.AbstractScalingSQLBuilder;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * OpenGauss SQL builder.
+ */
+public final class OpenGaussScalingSQLBuilder extends AbstractScalingSQLBuilder {
+
+    public OpenGaussScalingSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+        super(shardingColumnsMap);
+    }
+    
+    @Override
+    public String getLeftIdentifierQuoteString() {
+        return "\"";
+    }
+    
+    @Override
+    public String getRightIdentifierQuoteString() {
+        return "\"";
+    }
+    
+    @Override
+    public String buildInsertSQL(final DataRecord dataRecord) {
+        return super.buildInsertSQL(dataRecord) + buildConflictSQL();
+    }
+    
+    private String buildConflictSQL() {
+        // there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
+        return "";
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
similarity index 63%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
index 98857d6..3c6cdae 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.component;
+package org.apache.shardingsphere.scaling.opengauss.component;
 
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -28,38 +28,40 @@ import org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor;
 import org.apache.shardingsphere.scaling.core.executor.dumper.IncrementalDumper;
 import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
 import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
-import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
+import org.apache.shardingsphere.scaling.opengauss.wal.OpenGaussLogicalReplication;
+import org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussTimestampUtils;
+import org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussLogSequenceNumber;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
-import org.postgresql.jdbc.PgConnection;
-import org.postgresql.replication.PGReplicationStream;
+import org.opengauss.jdbc.PgConnection;
+import org.opengauss.replication.PGReplicationStream;
 
+import javax.sql.DataSource;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.SQLException;
 
 /**
- * PostgreSQL WAL dumper.
+ * OpenGauss WAL dumper.
  */
 @Slf4j
-public final class PostgreSQLWalDumper extends AbstractScalingExecutor implements IncrementalDumper {
-    
+public final class OpenGaussWalDumper extends AbstractScalingExecutor implements IncrementalDumper {
+
     private final WalPosition walPosition;
-    
+
     private final DumperConfiguration dumperConfig;
-    
-    private final LogicalReplication logicalReplication = new LogicalReplication();
-    
+
+    private final OpenGaussLogicalReplication logicalReplication = new OpenGaussLogicalReplication();
+
     private final WalEventConverter walEventConverter;
-    
+
     @Setter
     private Channel channel;
-    
-    public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final ScalingPosition<WalPosition> position) {
+
+    public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final ScalingPosition<WalPosition> position) {
         walPosition = (WalPosition) position;
         if (!StandardJDBCDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
             throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
@@ -73,18 +75,40 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
         super.start();
         dump();
     }
-    
+
+    private PgConnection getReplicationConn() throws SQLException {
+        return logicalReplication
+                .createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig())
+                .unwrap(PgConnection.class);
+    }
+
+    private TestDecodingPlugin initReplication() {
+        TestDecodingPlugin plugin = null;
+        try {
+            DataSource dataSource = dumperConfig.getDataSourceConfig().toDataSource();
+            try (Connection conn = dataSource.getConnection()) {
+                OpenGaussLogicalReplication.createIfNotExists(conn);
+                OpenGaussTimestampUtils utils = new OpenGaussTimestampUtils(conn.unwrap(PgConnection.class).getTimestampUtils());
+                plugin = new TestDecodingPlugin(utils);
+            }
+        } catch (SQLException sqlExp) {
+            log.warn("create replication slot failed!");
+        }
+        return plugin;
+    }
+
     private void dump() {
-        try (Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
-             PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) {
-            DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+        TestDecodingPlugin decodingPlugin = initReplication();
+        try (PgConnection pgConnection = getReplicationConn()) {
+            PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, walPosition.getLogSequenceNumber());
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
                     ThreadUtil.sleep(10L);
                     continue;
                 }
-                AbstractWalEvent event = decodingPlugin.decode(message, stream.getLastReceiveLSN());
+                AbstractWalEvent event = decodingPlugin.decode(message,
+                        new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
                 Record record = walEventConverter.convert(event);
                 if (!(event instanceof PlaceholderEvent) && log.isDebugEnabled()) {
                     log.debug("dump, event={}, record={}", event, record);
@@ -95,7 +119,7 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
             throw new ScalingTaskExecuteException(ex);
         }
     }
-    
+
     private void pushRecord(final Record record) {
         try {
             channel.pushRecord(record);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
new file mode 100644
index 0000000..6fcaa31
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal;
+
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
+import org.opengauss.PGProperty;
+import org.opengauss.jdbc.PgConnection;
+import org.opengauss.replication.LogSequenceNumber;
+import org.opengauss.replication.PGReplicationStream;
+import org.opengauss.util.PSQLException;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * OpenGauss logical replication.
+ */
+public final class OpenGaussLogicalReplication {
+
+    public static final String SLOT_NAME = "sharding_scaling";
+
+    public static final String DECODE_PLUGIN = "test_decoding";
+
+    public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+
+    /**
+     * Create OpenGauss connection.
+     *
+     * @param jdbcDataSourceConfig JDBC data source configuration
+     * @return OpenGauss connection
+     * @throws SQLException sql exception
+     */
+    public Connection createPgConnection(final StandardJDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+        return createConnection(jdbcDataSourceConfig);
+    }
+    
+    private Connection createConnection(final StandardJDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+        Properties props = new Properties();
+        PGProperty.USER.set(props, jdbcDataSourceConfig.getHikariConfig().getUsername());
+        PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getHikariConfig().getPassword());
+        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
+        PGProperty.REPLICATION.set(props, "database");
+        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
+        return DriverManager.getConnection(jdbcDataSourceConfig.getHikariConfig().getJdbcUrl(), props);
+    }
+    
+    /**
+     * Create OpenGauss replication stream.
+     *
+     * @param pgConnection OpenGauss connection
+     * @param startPosition start position
+     * @return replication stream
+     * @throws SQLException sql exception
+     */
+    public PGReplicationStream createReplicationStream(final PgConnection pgConnection, final BaseLogSequenceNumber startPosition) throws SQLException {
+        return pgConnection.getReplicationAPI()
+                .replicationStream()
+                .logical()
+                .withSlotName(SLOT_NAME)
+                .withSlotOption("include-xids", true)
+                .withSlotOption("skip-empty-xacts", true)
+                .withStartPosition((LogSequenceNumber) startPosition.get())
+                .start();
+    }
+
+    /**
+     * Create slots (drop existed slot before create).
+     *
+     * @param conn the datasource connection
+     * @throws SQLException the sql exp
+     */
+    public static void createIfNotExists(final Connection conn) throws SQLException {
+        if (isSlotNameExist(conn)) {
+            dropSlot(conn);
+        }
+        createSlotBySql(conn);
+    }
+    
+    /**
+     * Drop replication slot by connection.
+     *
+     * @param conn the database connection
+     * @throws SQLException drop sql with error
+     */
+    public static void dropSlot(final Connection conn) throws SQLException {
+        String sql = String.format("select * from pg_drop_replication_slot('%s')", SLOT_NAME);
+        try (CallableStatement cs = conn.prepareCall(sql)) {
+            cs.execute();
+        }
+    }
+
+    private static boolean isSlotNameExist(final Connection conn) throws SQLException {
+        String sql = "select * from pg_replication_slots where slot_name=?";
+        try (PreparedStatement ps = conn.prepareStatement(sql)) {
+            ps.setString(1, SLOT_NAME);
+            try (ResultSet rs = ps.executeQuery()) {
+                return rs.next();
+            }
+        }
+    }
+
+    private static void createSlotBySql(final Connection connection) throws SQLException {
+        try (PreparedStatement ps = connection.prepareStatement(
+                String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')",
+                        SLOT_NAME,
+                        DECODE_PLUGIN))) {
+            ps.execute();
+        } catch (final PSQLException ex) {
+            if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
+                throw ex;
+            }
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussLogSequenceNumber.java
similarity index 58%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussLogSequenceNumber.java
index fad8ac3..00b4e64 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussLogSequenceNumber.java
@@ -15,24 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.wal.decode;
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
 
-import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
-import org.postgresql.replication.LogSequenceNumber;
-
-import java.nio.ByteBuffer;
+import lombok.AllArgsConstructor;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
+import org.opengauss.replication.LogSequenceNumber;
 
 /**
- * logical replication decoding plugin interface.
+ * OpenGauss sequence.
  */
-public interface DecodingPlugin {
+@AllArgsConstructor
+public final class OpenGaussLogSequenceNumber implements BaseLogSequenceNumber {
     
-    /**
-     * Decode wal event from logical replication data.
-     *
-     * @param data of logical replication
-     * @param logSequenceNumber wal lsn
-     * @return wal event
-     */
-    AbstractWalEvent decode(ByteBuffer data, LogSequenceNumber logSequenceNumber);
+    private final LogSequenceNumber logSequenceNumber;
+
+    @Override
+    public long asLong() {
+        return logSequenceNumber.asLong();
+    }
+
+    @Override
+    public Object get() {
+        return logSequenceNumber;
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussTimestampUtils.java
similarity index 52%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussTimestampUtils.java
index b0fbbd0..36dddd4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussTimestampUtils.java
@@ -15,32 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.wal;
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
-import org.postgresql.replication.LogSequenceNumber;
+import lombok.AllArgsConstructor;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseTimestampUtils;
+import org.opengauss.jdbc.TimestampUtils;
+
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
 
 /**
- * PostgreSQL wal position.
+ * OpenGauss timestamputils.
  */
-@RequiredArgsConstructor
-@Getter
-public final class WalPosition implements ScalingPosition<WalPosition> {
-    
-    private final LogSequenceNumber logSequenceNumber;
+@AllArgsConstructor
+public final class OpenGaussTimestampUtils implements BaseTimestampUtils {
     
+    private final TimestampUtils timestampUtils;
+
     @Override
-    public int compareTo(final WalPosition position) {
-        if (null == position) {
-            return 1;
-        }
-        return Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
+    public Time toTime(final Calendar cal, final String input) throws SQLException {
+        return timestampUtils.toTime(cal, input);
     }
-    
+
     @Override
-    public String toString() {
-        return String.valueOf(logSequenceNumber.asLong());
+    public Timestamp toTimestamp(final Calendar cal, final String input) throws SQLException {
+        return timestampUtils.toTimestamp(cal, input);
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingEntry b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingEntry
new file mode 100644
index 0000000..4f2d148
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingEntry
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.opengauss.OpenGaussScalingEntry
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
new file mode 100644
index 0000000..cd1a377
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss;
+
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussImporter;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussPositionInitializer;
+import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussWalDumper;
+import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLInventoryDumper;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataConsistencyChecker;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataSourceChecker;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLEnvironmentChecker;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class OpenGaussScalingEntryTest {
+    
+    @Test
+    public void assertGetScalingEntryByDatabaseType() throws IllegalAccessException, InstantiationException {
+        ScalingEntry scalingEntry = ScalingEntryLoader.getInstance("openGauss");
+        assertTrue(scalingEntry instanceof OpenGaussScalingEntry);
+        assertThat(scalingEntry.getPositionInitializerClass(), equalTo(OpenGaussPositionInitializer.class));
+        assertThat(scalingEntry.getEnvironmentCheckerClass(), equalTo(PostgreSQLEnvironmentChecker.class));
+        assertThat(scalingEntry.getEnvironmentCheckerClass().newInstance().getDataSourceCheckerClass(), equalTo(PostgreSQLDataSourceChecker.class));
+        assertThat(scalingEntry.getEnvironmentCheckerClass().newInstance().getDataConsistencyCheckerClass(), equalTo(PostgreSQLDataConsistencyChecker.class));
+        assertThat(scalingEntry.getImporterClass(), equalTo(OpenGaussImporter.class));
+        assertThat(scalingEntry.getInventoryDumperClass(), equalTo(PostgreSQLInventoryDumper.class));
+        assertThat(scalingEntry.getIncrementalDumperClass(), equalTo(OpenGaussWalDumper.class));
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/resources/logback-test.xml
similarity index 51%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/resources/logback-test.xml
index 6125da7..f2ae907 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/resources/logback-test.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0"?>
 <!--
   ~ Licensed to the Apache Software Foundation (ASF) under one or more
   ~ contributor license agreements.  See the NOTICE file distributed with
@@ -16,21 +16,19 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.shardingsphere</groupId>
-        <artifactId>shardingsphere-scaling</artifactId>
-        <version>5.0.0-RC1-SNAPSHOT</version>
-    </parent>
-    <artifactId>shardingsphere-scaling-dialect</artifactId>
-    <name>${project.artifactId}</name>
-    <packaging>pom</packaging>
-
-    <modules>
-        <module>shardingsphere-scaling-mysql</module>
-        <module>shardingsphere-scaling-postgresql</module>
-    </modules>
-</project>
+<configuration>
+    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+    <logger name="org.apache.shardingsphere" level="warn" additivity="false">
+        <appender-ref ref="console" />
+    </logger>
+    <logger name="com.zaxxer.hikari" level="warn" />
+    
+    <root>
+        <level value="error" />
+        <appender-ref ref="console" />
+    </root>
+</configuration> 
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index d55fcb1..9973f07 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.postgresql.component;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.util.PSQLException;
 
@@ -51,7 +52,8 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     
     @Override
     public WalPosition init(final String data) {
-        return new WalPosition(LogSequenceNumber.valueOf(Long.parseLong(data)));
+        return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data)))
+        );
     }
     
     private void createIfNotExists(final Connection connection) throws SQLException {
@@ -83,7 +85,8 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
         try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
              ResultSet rs = ps.executeQuery()) {
             rs.next();
-            return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1)));
+            return new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(rs.getString(1)))
+            );
         }
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
index 98857d6..415f86b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
@@ -32,9 +32,11 @@ import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLTimestampUtils;
 import org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.PGReplicationStream;
 
@@ -77,14 +79,15 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
     private void dump() {
         try (Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
              PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) {
-            DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+            PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+            DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
                     ThreadUtil.sleep(10L);
                     continue;
                 }
-                AbstractWalEvent event = decodingPlugin.decode(message, stream.getLastReceiveLSN());
+                AbstractWalEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
                 Record record = walEventConverter.convert(event);
                 if (!(event instanceof PlaceholderEvent) && log.isDebugEnabled()) {
                     log.debug("dump, event={}, record={}", event, record);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
index 3378a39..13aadad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.scaling.postgresql.wal;
 
 import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
 import org.postgresql.PGConnection;
 import org.postgresql.PGProperty;
 import org.postgresql.replication.LogSequenceNumber;
@@ -63,11 +64,11 @@ public final class LogicalReplication {
      * @return replication stream
      * @throws SQLException sql exception
      */
-    public PGReplicationStream createReplicationStream(final Connection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
+    public PGReplicationStream createReplicationStream(final Connection pgConnection, final String slotName, final BaseLogSequenceNumber startPosition) throws SQLException {
         return pgConnection.unwrap(PGConnection.class).getReplicationAPI()
                 .replicationStream()
                 .logical()
-                .withStartPosition(startPosition)
+                .withStartPosition((LogSequenceNumber) startPosition.get())
                 .withSlotName(slotName)
                 .withSlotOption("include-xids", true)
                 .withSlotOption("skip-empty-xacts", true)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index b0fbbd0..1b13a37 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.postgresql.wal;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
-import org.postgresql.replication.LogSequenceNumber;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
 
 /**
  * PostgreSQL wal position.
@@ -29,7 +29,7 @@ import org.postgresql.replication.LogSequenceNumber;
 @Getter
 public final class WalPosition implements ScalingPosition<WalPosition> {
     
-    private final LogSequenceNumber logSequenceNumber;
+    private final BaseLogSequenceNumber logSequenceNumber;
     
     @Override
     public int compareTo(final WalPosition position) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/BaseLogSequenceNumber.java
similarity index 66%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/BaseLogSequenceNumber.java
index 07623e0..14de46a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/BaseLogSequenceNumber.java
@@ -15,20 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.wal.event;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.postgresql.replication.LogSequenceNumber;
+package org.apache.shardingsphere.scaling.postgresql.wal.decode;
 
 /**
- * Abstract wal event.
+ * Base of log sequence number interface.
  */
-@Getter
-@Setter
-@ToString
-public abstract class AbstractWalEvent {
+public interface BaseLogSequenceNumber {
     
-    private LogSequenceNumber logSequenceNumber;
+    /**
+     * Convert log sequence number to long.
+     *
+     * @return Long the squence number of long value
+     */
+    long asLong();
+
+    /**
+     * Get the binded object.
+     *
+     * @return Object the bind log sequence number
+     */
+    Object get();
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/BaseTimestampUtils.java
similarity index 60%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/BaseTimestampUtils.java
index fad8ac3..1c09864 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/BaseTimestampUtils.java
@@ -17,22 +17,33 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal.decode;
 
-import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
-import org.postgresql.replication.LogSequenceNumber;
-
-import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
 
 /**
  * logical replication decoding plugin interface.
  */
-public interface DecodingPlugin {
+public interface BaseTimestampUtils {
     
     /**
-     * Decode wal event from logical replication data.
+     * Get time.
+     *
+     * @param cal the cal
+     * @param input the input time of string
+     * @return Time the time
+     * @throws SQLException the exp
+     */
+    Time toTime(Calendar cal, String input) throws SQLException;
+
+    /**
+     * Get timestamp.
      *
-     * @param data of logical replication
-     * @param logSequenceNumber wal lsn
-     * @return wal event
+     * @param cal the cal
+     * @param input the input timestamp of string
+     * @return Timestamp the timestamp
+     * @throws SQLException the exp
      */
-    AbstractWalEvent decode(ByteBuffer data, LogSequenceNumber logSequenceNumber);
+    Timestamp toTimestamp(Calendar cal, String input) throws SQLException;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
index fad8ac3..36fb1d6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.scaling.postgresql.wal.decode;
 
 import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
-import org.postgresql.replication.LogSequenceNumber;
 
 import java.nio.ByteBuffer;
 
@@ -34,5 +33,5 @@ public interface DecodingPlugin {
      * @param logSequenceNumber wal lsn
      * @return wal event
      */
-    AbstractWalEvent decode(ByteBuffer data, LogSequenceNumber logSequenceNumber);
+    AbstractWalEvent decode(ByteBuffer data, BaseLogSequenceNumber logSequenceNumber);
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLLogSequenceNumber.java
similarity index 67%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLLogSequenceNumber.java
index fad8ac3..08576e5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -17,22 +17,24 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal.decode;
 
-import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
+import lombok.AllArgsConstructor;
 import org.postgresql.replication.LogSequenceNumber;
 
-import java.nio.ByteBuffer;
-
 /**
- * logical replication decoding plugin interface.
+ * PostgreSQL sequence.
  */
-public interface DecodingPlugin {
+@AllArgsConstructor
+public final class PostgreSQLLogSequenceNumber implements BaseLogSequenceNumber {
     
-    /**
-     * Decode wal event from logical replication data.
-     *
-     * @param data of logical replication
-     * @param logSequenceNumber wal lsn
-     * @return wal event
-     */
-    AbstractWalEvent decode(ByteBuffer data, LogSequenceNumber logSequenceNumber);
+    private final LogSequenceNumber logSequenceNumber;
+
+    @Override
+    public long asLong() {
+        return logSequenceNumber.asLong();
+    }
+
+    @Override
+    public Object get() {
+        return logSequenceNumber;
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLTimestampUtils.java
similarity index 56%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLTimestampUtils.java
index fad8ac3..4adaeca 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/DecodingPlugin.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLTimestampUtils.java
@@ -17,22 +17,29 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal.decode;
 
-import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
-import org.postgresql.replication.LogSequenceNumber;
+import lombok.AllArgsConstructor;
+import org.postgresql.jdbc.TimestampUtils;
 
-import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
 
 /**
- * logical replication decoding plugin interface.
+ * PostgreSQL sequence.
  */
-public interface DecodingPlugin {
+@AllArgsConstructor
+public final class PostgreSQLTimestampUtils implements BaseTimestampUtils {
     
-    /**
-     * Decode wal event from logical replication data.
-     *
-     * @param data of logical replication
-     * @param logSequenceNumber wal lsn
-     * @return wal event
-     */
-    AbstractWalEvent decode(ByteBuffer data, LogSequenceNumber logSequenceNumber);
+    private final TimestampUtils timestampUtils;
+
+    @Override
+    public Time toTime(final Calendar cal, final String input) throws SQLException {
+        return timestampUtils.toTime(cal, input);
+    }
+
+    @Override
+    public Timestamp toTimestamp(final Calendar cal, final String input) throws SQLException {
+        return timestampUtils.toTimestamp(cal, input);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPlugin.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPlugin.java
index 8623f72..4997c61 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPlugin.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPlugin.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
-import org.postgresql.jdbc.TimestampUtils;
-import org.postgresql.replication.LogSequenceNumber;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -42,10 +40,10 @@ import java.util.List;
 @AllArgsConstructor
 public final class TestDecodingPlugin implements DecodingPlugin {
     
-    private final TimestampUtils timestampUtils;
+    private final BaseTimestampUtils timestampUtils;
     
     @Override
-    public AbstractWalEvent decode(final ByteBuffer data, final LogSequenceNumber logSequenceNumber) {
+    public AbstractWalEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
         AbstractWalEvent result;
         String eventType = readEventType(data);
         if ("table".equals(eventType)) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java
index 07623e0..5cd21ef 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/AbstractWalEvent.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.postgresql.wal.event;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.postgresql.replication.LogSequenceNumber;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
 
 /**
  * Abstract wal event.
@@ -30,5 +30,5 @@ import org.postgresql.replication.LogSequenceNumber;
 @ToString
 public abstract class AbstractWalEvent {
     
-    private LogSequenceNumber logSequenceNumber;
+    private BaseLogSequenceNumber logSequenceNumber;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporterTest.java
index a3c0dad..c392466 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporterTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.scaling.core.common.record.Column;
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -49,7 +50,7 @@ public final class PostgreSQLImporterTest {
     }
     
     private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+        DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
         result.setTableName("t_order");
         result.addColumn(new Column("id", 1, true, true));
         result.addColumn(new Column("name", "", true, false));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
index 67c956b..c1eb296 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
@@ -72,7 +72,7 @@ public final class PostgreSQLPositionInitializerTest {
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
         WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
-        assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
+        assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
     }
     
     @Test
@@ -80,7 +80,7 @@ public final class PostgreSQLPositionInitializerTest {
         mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
         WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
-        assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
+        assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
     }
     
     @Test(expected = RuntimeException.class)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
index 997bb2f..6650d06 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.postgresql.component;
 import org.apache.shardingsphere.scaling.core.common.record.Column;
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
@@ -37,7 +38,7 @@ public final class PostgreSQLScalingSQLBuilderTest {
     }
     
     private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+        DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
         result.setTableName("t_order");
         result.addColumn(new Column("id", 1, true, true));
         result.addColumn(new Column("name", "", true, false));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
index b11a5b4..3cd6e0b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCData
 import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
 import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -70,7 +71,7 @@ public final class PostgreSQLWalDumperTest {
     @Before
     public void setUp() {
         ScalingContext.getInstance().init(new ServerConfiguration());
-        position = new WalPosition(LogSequenceNumber.valueOf(100L));
+        position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position);
         channel = new MemoryChannel(records -> {
         });
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
index 4cbb25e..f4d16a4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.scaling.postgresql.wal;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -79,7 +81,8 @@ public final class LogicalReplicationTest {
         when(chainedLogicalStreamBuilder.withStartPosition(startPosition)).thenReturn(chainedLogicalStreamBuilder);
         when(chainedLogicalStreamBuilder.withSlotName("")).thenReturn(chainedLogicalStreamBuilder);
         when(chainedLogicalStreamBuilder.withSlotOption(anyString(), eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
-        logicalReplication.createReplicationStream(pgConnection, "", startPosition);
+        BaseLogSequenceNumber basePosition = new PostgreSQLLogSequenceNumber(startPosition);
+        logicalReplication.createReplicationStream(pgConnection, "", basePosition);
         verify(chainedLogicalStreamBuilder).start();
     }
     
@@ -87,6 +90,6 @@ public final class LogicalReplicationTest {
     @SneakyThrows(SQLException.class)
     public void assertCreateReplicationStreamFailure() {
         when(pgConnection.unwrap(PGConnection.class)).thenThrow(new SQLException(""));
-        logicalReplication.createReplicationStream(pgConnection, "", LogSequenceNumber.valueOf(100L));
+        logicalReplication.createReplicationStream(pgConnection, "", new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
index 9b2c590..d144fb0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal;
 
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
@@ -27,13 +28,13 @@ public final class WalPositionTest {
     
     @Test
     public void assertCompareTo() {
-        WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
+        WalPosition walPosition = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         assertThat(walPosition.compareTo(null), is(1));
-        assertThat(walPosition.compareTo(new WalPosition(LogSequenceNumber.valueOf(100L))), is(0));
+        assertThat(walPosition.compareTo(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));
     }
     
     @Test
     public void assertToString() {
-        assertThat(new WalPosition(LogSequenceNumber.valueOf(100L)).toString(), is("100"));
+        assertThat(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), is("100"));
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
index f98f8c7..2d9011d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
@@ -38,8 +38,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public final class TestDecodingPluginTest {
-    
-    private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
+
+    private final LogSequenceNumber pgSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
+
+    private final PostgreSQLLogSequenceNumber logSequenceNumber = new PostgreSQLLogSequenceNumber(pgSequenceNumber);
     
     @Test
     public void assertDecodeWriteRowEvent() {
@@ -96,6 +98,6 @@ public final class TestDecodingPluginTest {
         TimestampUtils timestampUtils = mock(TimestampUtils.class);
         when(timestampUtils.toTime(null, "1 2 3'")).thenThrow(new SQLException(""));
         ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes());
-        new TestDecodingPlugin(timestampUtils).decode(data, logSequenceNumber);
+        new TestDecodingPlugin(new PostgreSQLTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
     }
 }