You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/21 11:39:18 UTC

[shardingsphere] branch master updated: add unit test for shardingsphere-scaling-postgresql (#6983)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e28d94  add unit test for shardingsphere-scaling-postgresql (#6983)
4e28d94 is described below

commit 4e28d946c00e9faf15f86c9fe6bc21cc2baaf8ed
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Aug 21 19:39:03 2020 +0800

    add unit test for shardingsphere-scaling-postgresql (#6983)
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/mysql/MySQLJdbcDumperTest.java         |   3 -
 .../shardingsphere-scaling-postgresql/pom.xml      |   5 +
 .../scaling/postgresql/PostgreSQLJdbcDumper.java   |   2 +-
 .../scaling/postgresql/PostgreSQLWalDumper.java    |   5 +-
 .../scaling/postgresql/wal/LogicalReplication.java |  29 ++--
 .../PostgreSQLDataSourceCheckerTest.java           |  45 ++++---
 .../scaling/postgresql/PostgreSQLImporterTest.java |  57 ++++++++
 .../postgresql/PostgreSQLJdbcDumperTest.java}      |  47 ++-----
 .../postgresql/PostgreSQLWalDumperTest.java        | 100 ++++++++++++++
 .../scaling/postgresql/utils/ReflectionUtil.java   |  85 ++++++++++++
 .../postgresql/wal/LogicalReplicationTest.java     |  94 +++++++++++++
 .../postgresql/wal/WalEventConverterTest.java      | 147 +++++++++++++++++++++
 .../scaling/postgresql/wal/WalPositionTest.java    |  40 ++++++
 .../wal/decode/TestDecodingPluginTest.java         |  57 ++++++--
 14 files changed, 631 insertions(+), 85 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
index 506170e..f2cb4e7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
@@ -24,8 +24,6 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -42,7 +40,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.class)
 public final class MySQLJdbcDumperTest {
     
     private DataSourceManager dataSourceManager;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml
index 5c0af86..91b1b80 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml
@@ -40,5 +40,10 @@
             <artifactId>postgresql</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
index a29f0c2..ad4da9a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
@@ -38,7 +38,7 @@ public final class PostgreSQLJdbcDumper extends AbstractJDBCDumper {
     @Override
     protected PreparedStatement createPreparedStatement(final Connection conn, final String sql) throws SQLException {
         PreparedStatement result = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-        result.setFetchSize(1);
+        result.setFetchSize(100);
         return result;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 8965bc5..50f5b10 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -33,7 +33,6 @@ import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
-import org.postgresql.PGConnection;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.PGReplicationStream;
 
@@ -74,8 +73,8 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
     
     private void dump() {
         try {
-            PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
-            DecodingPlugin decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
+            Connection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
+            DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
             PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
                     PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
             while (isRunning()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
index 384f9c7..80f2ee6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
@@ -23,6 +23,7 @@ import org.postgresql.PGProperty;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.PGReplicationStream;
 
+import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Properties;
@@ -33,16 +34,26 @@ import java.util.Properties;
 public final class LogicalReplication {
     
     /**
-     *  Create PostgreSQL connection.
+     * Create PostgreSQL connection.
      *
      * @param jdbcDataSourceConfig JDBC data source configuration
      * @return PostgreSQL connection
      * @throws SQLException sql exception
      */
-    public PGConnection createPgConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+    public Connection createPgConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
         return createConnection(jdbcDataSourceConfig);
     }
     
+    private Connection createConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+        Properties props = new Properties();
+        PGProperty.USER.set(props, jdbcDataSourceConfig.getUsername());
+        PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getPassword());
+        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
+        PGProperty.REPLICATION.set(props, "database");
+        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
+        return DriverManager.getConnection(jdbcDataSourceConfig.getJdbcUrl(), props);
+    }
+    
     /**
      * Create PostgreSQL replication stream.
      *
@@ -52,8 +63,8 @@ public final class LogicalReplication {
      * @return replication stream
      * @throws SQLException sql exception
      */
-    public PGReplicationStream createReplicationStream(final PGConnection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
-        return pgConnection.getReplicationAPI()
+    public PGReplicationStream createReplicationStream(final Connection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
+        return pgConnection.unwrap(PGConnection.class).getReplicationAPI()
                 .replicationStream()
                 .logical()
                 .withStartPosition(startPosition)
@@ -62,14 +73,4 @@ public final class LogicalReplication {
                 .withSlotOption("skip-empty-xacts", true)
                 .start();
     }
-    
-    private PGConnection createConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
-        Properties props = new Properties();
-        PGProperty.USER.set(props, jdbcDataSourceConfig.getUsername());
-        PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getPassword());
-        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
-        PGProperty.REPLICATION.set(props, "database");
-        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
-        return DriverManager.getConnection(jdbcDataSourceConfig.getJdbcUrl(), props).unwrap(PGConnection.class);
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java
index 04bc399..252d6cc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java
@@ -42,12 +42,18 @@ import static org.mockito.Mockito.when;
 public final class PostgreSQLDataSourceCheckerTest {
 
     private static final String CATALOG = "test";
-
+    
     @Mock
     private Connection connection;
-
+    
     @Mock
     private PreparedStatement preparedStatement;
+    
+    @Mock
+    private ResultSet resultSet;
+    
+    @Mock
+    private DatabaseMetaData metaData;
 
     private Collection<DataSource> dataSources;
 
@@ -56,40 +62,47 @@ public final class PostgreSQLDataSourceCheckerTest {
         DataSource dataSource = mock(DataSource.class);
         Connection connection = mockConnection();
         when(dataSource.getConnection()).thenReturn(connection);
+        when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
         dataSources = new LinkedList<>();
         dataSources.add(dataSource);
     }
 
     @SneakyThrows
     private Connection mockConnection() {
-        DatabaseMetaData metaData = mock(DatabaseMetaData.class);
         when(connection.getMetaData()).thenReturn(metaData);
-        ResultSet resultSet = mockResultSet();
-        when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
         when(connection.getCatalog()).thenReturn(CATALOG);
         when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenReturn(preparedStatement);
         return connection;
     }
 
-    @SneakyThrows
-    private ResultSet mockResultSet() {
-        ResultSet resultSet = mock(ResultSet.class);
-        when(resultSet.next()).thenReturn(true);
-        when(resultSet.getString(3)).thenReturn("test");
-        return resultSet;
-    }
-
     @Test
     public void assertCheckPrivilege() throws SQLException {
+        when(resultSet.next()).thenReturn(true);
+        when(resultSet.getString(3)).thenReturn("test");
         PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
         dataSourceChecker.checkPrivilege(dataSources);
         verify(preparedStatement).executeQuery();
     }
-
+    
     @Test(expected = PrepareFailedException.class)
-    public void assertCheckPrivilegeFailed() throws SQLException {
-        when(preparedStatement.executeQuery()).thenThrow(new SQLException());
+    public void assertCheckPrivilegeWithoutTable() throws SQLException {
+        when(resultSet.next()).thenReturn(false);
         PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
         dataSourceChecker.checkPrivilege(dataSources);
     }
+    
+    @Test(expected = PrepareFailedException.class)
+    public void assertCheckPrivilegeFailure() throws SQLException {
+        when(resultSet.next()).thenReturn(true);
+        when(resultSet.getString(3)).thenReturn("test");
+        when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenThrow(new SQLException());
+        PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
+        dataSourceChecker.checkPrivilege(dataSources);
+    }
+    
+    @Test
+    public void assertCheckVariable() {
+        PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
+        dataSourceChecker.checkVariable(dataSources);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
new file mode 100644
index 0000000..0aae445
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql;
+
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.postgresql.replication.LogSequenceNumber;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class PostgreSQLImporterTest {
+    
+    @Mock
+    private ImporterConfiguration importerConfiguration;
+    
+    @Mock
+    private DataSourceManager dataSourceManager;
+    
+    @Test
+    public void assertCreateSqlBuilder() {
+        PostgreSQLImporter mySQLImporter = new PostgreSQLImporter(importerConfiguration, dataSourceManager);
+        String insertSQL = mySQLImporter.createSqlBuilder().buildInsertSQL(mockDataRecord());
+        assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+    }
+    
+    private DataRecord mockDataRecord() {
+        DataRecord result = new DataRecord(new WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+        result.setTableName("t_order");
+        result.addColumn(new Column("id", 1, true, true));
+        result.addColumn(new Column("name", "", true, false));
+        return result;
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumperTest.java
similarity index 70%
copy from shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
copy to shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumperTest.java
index 506170e..1c64913 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumperTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.mysql;
+package org.apache.shardingsphere.scaling.postgresql;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
@@ -24,35 +24,26 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Types;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.class)
-public final class MySQLJdbcDumperTest {
+public final class PostgreSQLJdbcDumperTest {
     
     private DataSourceManager dataSourceManager;
     
-    private MySQLJdbcDumper mySQLJdbcDumper;
+    private PostgreSQLJdbcDumper postgreSQLJdbcDumper;
     
     @Before
     public void setUp() {
         dataSourceManager = new DataSourceManager();
-        mySQLJdbcDumper = new MySQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
+        postgreSQLJdbcDumper = new PostgreSQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
     }
     
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
@@ -63,13 +54,6 @@ public final class MySQLJdbcDumperTest {
         return result;
     }
     
-    private DumperConfiguration mockDumperConfiguration() {
-        DumperConfiguration dumperConfiguration = new DumperConfiguration();
-        dumperConfiguration.setDataSourceConfiguration(
-                new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
-        return dumperConfiguration;
-    }
-    
     @SneakyThrows(SQLException.class)
     private void initTableData(final DumperConfiguration dumperConfig) {
         DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
@@ -83,25 +67,18 @@ public final class MySQLJdbcDumperTest {
     
     @Test
     @SneakyThrows(SQLException.class)
-    public void assertReadValue() {
-        ResultSet resultSet = mock(ResultSet.class);
-        ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
-        when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
-        when(resultSetMetaData.getColumnType(1)).thenReturn(Types.TIMESTAMP);
-        when(resultSetMetaData.getColumnType(2)).thenReturn(Types.VARCHAR);
-        mySQLJdbcDumper.readValue(resultSet, 1);
-        mySQLJdbcDumper.readValue(resultSet, 2);
-        verify(resultSet).getString(1);
-        verify(resultSet).getObject(2);
-    }
-    
-    @Test
-    @SneakyThrows(SQLException.class)
     public void assertCreatePreparedStatement() {
         DataSource dataSource = dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfiguration());
         try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = mySQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
+             PreparedStatement preparedStatement = postgreSQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
             assertThat(preparedStatement.getFetchSize(), is(100));
         }
     }
+    
+    private DumperConfiguration mockDumperConfiguration() {
+        DumperConfiguration dumperConfiguration = new DumperConfiguration();
+        dumperConfiguration.setDataSourceConfiguration(
+                new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+        return dumperConfiguration;
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumperTest.java
new file mode 100644
index 0000000..5582485
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumperTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
+import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
+import org.apache.shardingsphere.scaling.postgresql.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationStream;
+
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class PostgreSQLWalDumperTest {
+    
+    @Mock
+    private LogicalReplication logicalReplication;
+    
+    @Mock
+    private PgConnection pgConnection;
+    
+    @Mock
+    private PGReplicationStream pgReplicationStream;
+    
+    private WalPosition position;
+    
+    private PostgreSQLWalDumper postgreSQLWalDumper;
+    
+    private JDBCDataSourceConfiguration jdbcDataSourceConfiguration;
+    
+    private MemoryChannel channel;
+    
+    @Before
+    public void setUp() {
+        ScalingContext.getInstance().init(new ServerConfiguration());
+        position = new WalPosition(LogSequenceNumber.valueOf(100L));
+        postgreSQLWalDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position);
+        channel = new MemoryChannel(records -> {
+        });
+        postgreSQLWalDumper.setChannel(channel);
+    }
+    
+    private DumperConfiguration mockDumperConfiguration() {
+        jdbcDataSourceConfiguration = new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root");
+        DumperConfiguration dumperConfiguration = new DumperConfiguration();
+        dumperConfiguration.setDataSourceConfiguration(jdbcDataSourceConfiguration);
+        return dumperConfiguration;
+    }
+    
+    @Test
+    @SneakyThrows({ReflectiveOperationException.class, SQLException.class})
+    public void assertStart() {
+        try {
+            ReflectionUtil.setFieldValueToClass(postgreSQLWalDumper, "logicalReplication", logicalReplication);
+            when(logicalReplication.createPgConnection(jdbcDataSourceConfiguration)).thenReturn(pgConnection);
+            when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+            when(pgConnection.getTimestampUtils()).thenReturn(null);
+            when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
+            ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
+            when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException());
+            when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
+            postgreSQLWalDumper.start();
+        } catch (SyncTaskExecuteException ignore) {
+        }
+        assertThat(channel.fetchRecords(100, 0).size(), is(1));
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/utils/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/utils/ReflectionUtil.java
new file mode 100644
index 0000000..ce0bd89
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/utils/ReflectionUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.lang.reflect.Field;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ReflectionUtil {
+    
+    /**
+     * Get field from class.
+     *
+     * @param targetClass target class
+     * @param fieldName field name
+     * @param isDeclared is declared
+     * @return {@link Field}
+     * @throws NoSuchFieldException no such field exception
+     */
+    public static Field getFieldFromClass(final Class<?> targetClass, final String fieldName, final boolean isDeclared) throws NoSuchFieldException {
+        Field targetField;
+        if (isDeclared) {
+            targetField = targetClass.getDeclaredField(fieldName);
+        } else {
+            targetField = targetClass.getField(fieldName);
+        }
+        targetField.setAccessible(true);
+        return targetField;
+    }
+    
+    /**
+     * Get field value from instance target object.
+     *
+     * @param target target object
+     * @param fieldName field name
+     * @param valueClass expected value class
+     * @param <T> expected value class
+     * @return target filed value
+     * @throws NoSuchFieldException no such field exception
+     * @throws IllegalAccessException illegal access exception
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getFieldValueFromClass(final Object target, final String fieldName, final Class<T> valueClass) throws NoSuchFieldException, IllegalAccessException {
+        Field field = getFieldFromClass(target.getClass(), fieldName, true);
+        Object value = field.get(target);
+        if (null == value) {
+            return null;
+        }
+        if (value.getClass().isAssignableFrom(value.getClass())) {
+            return (T) value;
+        }
+        throw new ClassCastException("field " + fieldName + " is " + target.getClass().getName() + " can cast to " + valueClass.getName());
+    }
+    
+    /**
+     * Set value to target object field.
+     *
+     * @param target target object
+     * @param fieldName field name
+     * @param value new value
+     * @throws NoSuchFieldException no such field exception
+     * @throws IllegalAccessException illegal access exception
+     */
+    public static void setFieldValueToClass(final Object target, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
+        Field field = getFieldFromClass(target.getClass(), fieldName, true);
+        field.set(target, value);
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
new file mode 100644
index 0000000..42cd7f0
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.postgresql.PGConnection;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationConnection;
+import org.postgresql.replication.fluent.ChainedStreamBuilder;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class LogicalReplicationTest {
+    
+    @Mock
+    private PgConnection pgConnection;
+    
+    @Mock
+    private PGReplicationConnection pgReplicationConnection;
+    
+    @Mock
+    private ChainedStreamBuilder chainedStreamBuilder;
+    
+    @Mock
+    private ChainedLogicalStreamBuilder chainedLogicalStreamBuilder;
+    
+    private LogicalReplication logicalReplication;
+    
+    @Before
+    public void setUp() {
+        logicalReplication = new LogicalReplication();
+    }
+    
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertCreatePgConnectionSuccess() {
+        Connection pgConnection = logicalReplication.createPgConnection(
+                new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+        assertFalse(pgConnection.isClosed());
+    }
+    
+    @Test
+    @SneakyThrows(SQLException.class)
+    public void assertCreateReplicationStreamSuccess() {
+        LogSequenceNumber startPosition = LogSequenceNumber.valueOf(100L);
+        when(pgConnection.unwrap(PGConnection.class)).thenReturn(pgConnection);
+        when(pgConnection.getReplicationAPI()).thenReturn(pgReplicationConnection);
+        when(pgReplicationConnection.replicationStream()).thenReturn(chainedStreamBuilder);
+        when(chainedStreamBuilder.logical()).thenReturn(chainedLogicalStreamBuilder);
+        when(chainedLogicalStreamBuilder.withStartPosition(startPosition)).thenReturn(chainedLogicalStreamBuilder);
+        when(chainedLogicalStreamBuilder.withSlotName("")).thenReturn(chainedLogicalStreamBuilder);
+        when(chainedLogicalStreamBuilder.withSlotOption(anyString(), eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
+        logicalReplication.createReplicationStream(pgConnection, "", startPosition);
+        verify(chainedLogicalStreamBuilder).start();
+    }
+    
+    @Test(expected = SQLException.class)
+    @SneakyThrows(SQLException.class)
+    public void assertCreateReplicationStreamFailure() {
+        when(pgConnection.unwrap(PGConnection.class)).thenThrow(new SQLException());
+        logicalReplication.createReplicationStream(pgConnection, "", LogSequenceNumber.valueOf(100L));
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverterTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverterTest.java
new file mode 100644
index 0000000..10d99b1
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class WalEventConverterTest {
+    
+    private WalEventConverter walEventConverter;
+    
+    @Before
+    public void setUp() {
+        DumperConfiguration dumperConfiguration = mockDumperConfiguration();
+        walEventConverter = new WalEventConverter(dumperConfiguration);
+        initTableData(dumperConfiguration);
+    }
+    
+    private DumperConfiguration mockDumperConfiguration() {
+        DumperConfiguration reslut = new DumperConfiguration();
+        reslut.setDataSourceConfiguration(new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+        Map<String, String> tableNameMap = Maps.newHashMap();
+        tableNameMap.put("t_order", "t_order");
+        reslut.setTableNameMap(tableNameMap);
+        return reslut;
+    }
+    
+    @SneakyThrows(SQLException.class)
+    private void initTableData(final DumperConfiguration dumperConfig) {
+        DataSource dataSource = new DataSourceManager().getDataSource(dumperConfig.getDataSourceConfiguration());
+        try (Connection connection = dataSource.getConnection();
+             Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS t_order");
+            statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
+            statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+        }
+    }
+    
+    @Test
+    public void assertConvertWriteRowEvent() {
+        Record record = walEventConverter.convert(mockWriteRowEvent());
+        assertTrue(record instanceof DataRecord);
+        assertThat(((DataRecord) record).getType(), is(ScalingConstant.INSERT));
+    }
+    
+    @Test
+    public void assertConvertUpdateRowEvent() {
+        Record record = walEventConverter.convert(mockUpdateRowEvent());
+        assertTrue(record instanceof DataRecord);
+        assertThat(((DataRecord) record).getType(), is(ScalingConstant.UPDATE));
+    }
+    
+    @Test
+    public void assertConvertDeleteRowEvent() {
+        Record record = walEventConverter.convert(mockDeleteRowEvent());
+        assertTrue(record instanceof DataRecord);
+        assertThat(((DataRecord) record).getType(), is(ScalingConstant.DELETE));
+    }
+    
+    @Test
+    public void assertConvertPlaceholderEvent() {
+        Record record = walEventConverter.convert(new PlaceholderEvent());
+        assertTrue(record instanceof PlaceholderRecord);
+    }
+    
+    @Test
+    public void assertUnknownTable() {
+        Record record = walEventConverter.convert(mockUnknownTableEvent());
+        assertTrue(record instanceof PlaceholderRecord);
+    }
+    
+    @Test(expected = UnsupportedOperationException.class)
+    public void assertConvertFailure() {
+        walEventConverter.convert(new AbstractRowEvent());
+    }
+    
+    private AbstractRowEvent mockWriteRowEvent() {
+        WriteRowEvent result = new WriteRowEvent();
+        result.setSchemaName("");
+        result.setTableName("t_order");
+        result.setAfterRow(Lists.newArrayList("id", "user_id"));
+        return result;
+    }
+    
+    private AbstractRowEvent mockUpdateRowEvent() {
+        UpdateRowEvent result = new UpdateRowEvent();
+        result.setSchemaName("");
+        result.setTableName("t_order");
+        result.setAfterRow(Lists.newArrayList("id", "user_id"));
+        return result;
+    }
+    
+    private AbstractRowEvent mockDeleteRowEvent() {
+        DeleteRowEvent result = new DeleteRowEvent();
+        result.setSchemaName("");
+        result.setTableName("t_order");
+        result.setPrimaryKeys(Lists.newArrayList("id"));
+        return result;
+    }
+    
+    private AbstractRowEvent mockUnknownTableEvent() {
+        WriteRowEvent result = new WriteRowEvent();
+        result.setSchemaName("");
+        result.setTableName("t_other");
+        return result;
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
new file mode 100644
index 0000000..b6c8a1d
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal;
+
+import org.junit.Test;
+import org.postgresql.replication.LogSequenceNumber;
+
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+
+public final class WalPositionTest {
+    
+    @Test
+    public void assertCompareTo() {
+        WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
+        assertThat(walPosition.compareTo(null), is(1));
+        assertThat(walPosition.compareTo(new WalPosition(LogSequenceNumber.valueOf(100L))), is(0));
+    }
+    
+    @Test
+    public void assertToJson() {
+        WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
+        assertThat(walPosition.toJson().toString(), is("100"));
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
index 6bd6966..eb800bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
@@ -17,55 +17,86 @@
 
 package org.apache.shardingsphere.scaling.postgresql.wal.decode;
 
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
 import org.junit.Test;
+import org.postgresql.jdbc.TimestampUtils;
 import org.postgresql.replication.LogSequenceNumber;
+
 import java.nio.ByteBuffer;
+import java.sql.SQLException;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public final class TestDecodingPluginTest {
     
+    private LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
+    
     @Test
     public void assertDecodeWriteRowEvent() {
-        LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
         ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[character varying]:'1 2 3'''".getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
-        assertThat(actual.getLogSequenceNumber(), is(lsn));
+        WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
     }
     
     @Test
     public void assertDecodeUpdateRowEvent() {
-        LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
         ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: data[character varying]:'1 2 3'''".getBytes());
-        UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
-        assertThat(actual.getLogSequenceNumber(), is(lsn));
+        UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
     }
     
     @Test
     public void assertDecodeDeleteRowEvent() {
-        LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
         ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
-        DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
-        assertThat(actual.getLogSequenceNumber(), is(lsn));
+        DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         assertThat(actual.getPrimaryKeys().get(0), is(1));
     }
     
     @Test
     public void assertDecodeWriteRowEventWithByteA() {
-        LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
         ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
-        assertThat(actual.getLogSequenceNumber(), is(lsn));
+        WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
-        assertThat(actual.getAfterRow().get(0), is(new byte[] {(byte) 0xff, (byte) 0, (byte) 0xab}));
+        assertThat(actual.getAfterRow().get(0), is(new byte[]{(byte) 0xff, (byte) 0, (byte) 0xab}));
+    }
+    
+    @Test
+    public void assertDecodeUnknownTableType() {
+        ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
+        AbstractWalEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertTrue(actual instanceof PlaceholderEvent);
+    }
+    
+    @Test(expected = SyncTaskExecuteException.class)
+    public void assertDecodeUnknownRowEventType() {
+        ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes());
+        new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+    }
+    
+    @Test(expected = DecodingException.class)
+    @SneakyThrows(SQLException.class)
+    public void assertDecodeTime() {
+        TimestampUtils timestampUtils = mock(TimestampUtils.class);
+        when(timestampUtils.toTime(eq(null), eq("1 2 3'"))).thenThrow(new SQLException());
+        ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes());
+        new TestDecodingPlugin(timestampUtils).decode(data, logSequenceNumber);
     }
 }