You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/21 11:39:18 UTC
[shardingsphere] branch master updated: add unit test for
shardingsphere-scaling-postgresql (#6983)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4e28d94 add unit test for shardingsphere-scaling-postgresql (#6983)
4e28d94 is described below
commit 4e28d946c00e9faf15f86c9fe6bc21cc2baaf8ed
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Aug 21 19:39:03 2020 +0800
add unit test for shardingsphere-scaling-postgresql (#6983)
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/mysql/MySQLJdbcDumperTest.java | 3 -
.../shardingsphere-scaling-postgresql/pom.xml | 5 +
.../scaling/postgresql/PostgreSQLJdbcDumper.java | 2 +-
.../scaling/postgresql/PostgreSQLWalDumper.java | 5 +-
.../scaling/postgresql/wal/LogicalReplication.java | 29 ++--
.../PostgreSQLDataSourceCheckerTest.java | 45 ++++---
.../scaling/postgresql/PostgreSQLImporterTest.java | 57 ++++++++
.../postgresql/PostgreSQLJdbcDumperTest.java} | 47 ++-----
.../postgresql/PostgreSQLWalDumperTest.java | 100 ++++++++++++++
.../scaling/postgresql/utils/ReflectionUtil.java | 85 ++++++++++++
.../postgresql/wal/LogicalReplicationTest.java | 94 +++++++++++++
.../postgresql/wal/WalEventConverterTest.java | 147 +++++++++++++++++++++
.../scaling/postgresql/wal/WalPositionTest.java | 40 ++++++
.../wal/decode/TestDecodingPluginTest.java | 57 ++++++--
14 files changed, 631 insertions(+), 85 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
index 506170e..f2cb4e7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
@@ -24,8 +24,6 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -42,7 +40,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
public final class MySQLJdbcDumperTest {
private DataSourceManager dataSourceManager;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml
index 5c0af86..91b1b80 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/pom.xml
@@ -40,5 +40,10 @@
<artifactId>postgresql</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
index a29f0c2..ad4da9a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
@@ -38,7 +38,7 @@ public final class PostgreSQLJdbcDumper extends AbstractJDBCDumper {
@Override
protected PreparedStatement createPreparedStatement(final Connection conn, final String sql) throws SQLException {
PreparedStatement result = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- result.setFetchSize(1);
+ result.setFetchSize(100);
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 8965bc5..50f5b10 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -33,7 +33,6 @@ import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
-import org.postgresql.PGConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;
@@ -74,8 +73,8 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
private void dump() {
try {
- PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
- DecodingPlugin decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
+ Connection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
+ DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
while (isRunning()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
index 384f9c7..80f2ee6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplication.java
@@ -23,6 +23,7 @@ import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
+import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
@@ -33,16 +34,26 @@ import java.util.Properties;
public final class LogicalReplication {
/**
- * Create PostgreSQL connection.
+ * Create PostgreSQL connection.
*
* @param jdbcDataSourceConfig JDBC data source configuration
* @return PostgreSQL connection
* @throws SQLException sql exception
*/
- public PGConnection createPgConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+ public Connection createPgConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
return createConnection(jdbcDataSourceConfig);
}
+ private Connection createConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+ Properties props = new Properties();
+ PGProperty.USER.set(props, jdbcDataSourceConfig.getUsername());
+ PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getPassword());
+ PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
+ PGProperty.REPLICATION.set(props, "database");
+ PGProperty.PREFER_QUERY_MODE.set(props, "simple");
+ return DriverManager.getConnection(jdbcDataSourceConfig.getJdbcUrl(), props);
+ }
+
/**
* Create PostgreSQL replication stream.
*
@@ -52,8 +63,8 @@ public final class LogicalReplication {
* @return replication stream
* @throws SQLException sql exception
*/
- public PGReplicationStream createReplicationStream(final PGConnection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
- return pgConnection.getReplicationAPI()
+ public PGReplicationStream createReplicationStream(final Connection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
+ return pgConnection.unwrap(PGConnection.class).getReplicationAPI()
.replicationStream()
.logical()
.withStartPosition(startPosition)
@@ -62,14 +73,4 @@ public final class LogicalReplication {
.withSlotOption("skip-empty-xacts", true)
.start();
}
-
- private PGConnection createConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
- Properties props = new Properties();
- PGProperty.USER.set(props, jdbcDataSourceConfig.getUsername());
- PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getPassword());
- PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
- PGProperty.REPLICATION.set(props, "database");
- PGProperty.PREFER_QUERY_MODE.set(props, "simple");
- return DriverManager.getConnection(jdbcDataSourceConfig.getJdbcUrl(), props).unwrap(PGConnection.class);
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java
index 04bc399..252d6cc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataSourceCheckerTest.java
@@ -42,12 +42,18 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLDataSourceCheckerTest {
private static final String CATALOG = "test";
-
+
@Mock
private Connection connection;
-
+
@Mock
private PreparedStatement preparedStatement;
+
+ @Mock
+ private ResultSet resultSet;
+
+ @Mock
+ private DatabaseMetaData metaData;
private Collection<DataSource> dataSources;
@@ -56,40 +62,47 @@ public final class PostgreSQLDataSourceCheckerTest {
DataSource dataSource = mock(DataSource.class);
Connection connection = mockConnection();
when(dataSource.getConnection()).thenReturn(connection);
+ when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
dataSources = new LinkedList<>();
dataSources.add(dataSource);
}
@SneakyThrows
private Connection mockConnection() {
- DatabaseMetaData metaData = mock(DatabaseMetaData.class);
when(connection.getMetaData()).thenReturn(metaData);
- ResultSet resultSet = mockResultSet();
- when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
when(connection.getCatalog()).thenReturn(CATALOG);
when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenReturn(preparedStatement);
return connection;
}
- @SneakyThrows
- private ResultSet mockResultSet() {
- ResultSet resultSet = mock(ResultSet.class);
- when(resultSet.next()).thenReturn(true);
- when(resultSet.getString(3)).thenReturn("test");
- return resultSet;
- }
-
@Test
public void assertCheckPrivilege() throws SQLException {
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(3)).thenReturn("test");
PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
dataSourceChecker.checkPrivilege(dataSources);
verify(preparedStatement).executeQuery();
}
-
+
@Test(expected = PrepareFailedException.class)
- public void assertCheckPrivilegeFailed() throws SQLException {
- when(preparedStatement.executeQuery()).thenThrow(new SQLException());
+ public void assertCheckPrivilegeWithoutTable() throws SQLException {
+ when(resultSet.next()).thenReturn(false);
PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
dataSourceChecker.checkPrivilege(dataSources);
}
+
+ @Test(expected = PrepareFailedException.class)
+ public void assertCheckPrivilegeFailure() throws SQLException {
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(3)).thenReturn("test");
+ when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenThrow(new SQLException());
+ PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
+ dataSourceChecker.checkPrivilege(dataSources);
+ }
+
+ @Test
+ public void assertCheckVariable() {
+ PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
+ dataSourceChecker.checkVariable(dataSources);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
new file mode 100644
index 0000000..0aae445
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql;
+
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.postgresql.replication.LogSequenceNumber;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class PostgreSQLImporterTest {
+
+ @Mock
+ private ImporterConfiguration importerConfiguration;
+
+ @Mock
+ private DataSourceManager dataSourceManager;
+
+ @Test
+ public void assertCreateSqlBuilder() {
+ PostgreSQLImporter mySQLImporter = new PostgreSQLImporter(importerConfiguration, dataSourceManager);
+ String insertSQL = mySQLImporter.createSqlBuilder().buildInsertSQL(mockDataRecord());
+ assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ }
+
+ private DataRecord mockDataRecord() {
+ DataRecord result = new DataRecord(new WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+ result.setTableName("t_order");
+ result.addColumn(new Column("id", 1, true, true));
+ result.addColumn(new Column("name", "", true, false));
+ return result;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumperTest.java
similarity index 70%
copy from shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
copy to shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumperTest.java
index 506170e..1c64913 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumperTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.mysql;
+package org.apache.shardingsphere.scaling.postgresql;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
@@ -24,35 +24,26 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
-public final class MySQLJdbcDumperTest {
+public final class PostgreSQLJdbcDumperTest {
private DataSourceManager dataSourceManager;
- private MySQLJdbcDumper mySQLJdbcDumper;
+ private PostgreSQLJdbcDumper postgreSQLJdbcDumper;
@Before
public void setUp() {
dataSourceManager = new DataSourceManager();
- mySQLJdbcDumper = new MySQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
+ postgreSQLJdbcDumper = new PostgreSQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
}
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
@@ -63,13 +54,6 @@ public final class MySQLJdbcDumperTest {
return result;
}
- private DumperConfiguration mockDumperConfiguration() {
- DumperConfiguration dumperConfiguration = new DumperConfiguration();
- dumperConfiguration.setDataSourceConfiguration(
- new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
- return dumperConfiguration;
- }
-
@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
@@ -83,25 +67,18 @@ public final class MySQLJdbcDumperTest {
@Test
@SneakyThrows(SQLException.class)
- public void assertReadValue() {
- ResultSet resultSet = mock(ResultSet.class);
- ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
- when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
- when(resultSetMetaData.getColumnType(1)).thenReturn(Types.TIMESTAMP);
- when(resultSetMetaData.getColumnType(2)).thenReturn(Types.VARCHAR);
- mySQLJdbcDumper.readValue(resultSet, 1);
- mySQLJdbcDumper.readValue(resultSet, 2);
- verify(resultSet).getString(1);
- verify(resultSet).getObject(2);
- }
-
- @Test
- @SneakyThrows(SQLException.class)
public void assertCreatePreparedStatement() {
DataSource dataSource = dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = mySQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
+ PreparedStatement preparedStatement = postgreSQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
assertThat(preparedStatement.getFetchSize(), is(100));
}
}
+
+ private DumperConfiguration mockDumperConfiguration() {
+ DumperConfiguration dumperConfiguration = new DumperConfiguration();
+ dumperConfiguration.setDataSourceConfiguration(
+ new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+ return dumperConfiguration;
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumperTest.java
new file mode 100644
index 0000000..5582485
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumperTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
+import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
+import org.apache.shardingsphere.scaling.postgresql.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationStream;
+
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class PostgreSQLWalDumperTest {
+
+ @Mock
+ private LogicalReplication logicalReplication;
+
+ @Mock
+ private PgConnection pgConnection;
+
+ @Mock
+ private PGReplicationStream pgReplicationStream;
+
+ private WalPosition position;
+
+ private PostgreSQLWalDumper postgreSQLWalDumper;
+
+ private JDBCDataSourceConfiguration jdbcDataSourceConfiguration;
+
+ private MemoryChannel channel;
+
+ @Before
+ public void setUp() {
+ ScalingContext.getInstance().init(new ServerConfiguration());
+ position = new WalPosition(LogSequenceNumber.valueOf(100L));
+ postgreSQLWalDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position);
+ channel = new MemoryChannel(records -> {
+ });
+ postgreSQLWalDumper.setChannel(channel);
+ }
+
+ private DumperConfiguration mockDumperConfiguration() {
+ jdbcDataSourceConfiguration = new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root");
+ DumperConfiguration dumperConfiguration = new DumperConfiguration();
+ dumperConfiguration.setDataSourceConfiguration(jdbcDataSourceConfiguration);
+ return dumperConfiguration;
+ }
+
+ @Test
+ @SneakyThrows({ReflectiveOperationException.class, SQLException.class})
+ public void assertStart() {
+ try {
+ ReflectionUtil.setFieldValueToClass(postgreSQLWalDumper, "logicalReplication", logicalReplication);
+ when(logicalReplication.createPgConnection(jdbcDataSourceConfiguration)).thenReturn(pgConnection);
+ when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+ when(pgConnection.getTimestampUtils()).thenReturn(null);
+ when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
+ ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
+ when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException());
+ when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
+ postgreSQLWalDumper.start();
+ } catch (SyncTaskExecuteException ignore) {
+ }
+ assertThat(channel.fetchRecords(100, 0).size(), is(1));
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/utils/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/utils/ReflectionUtil.java
new file mode 100644
index 0000000..ce0bd89
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/utils/ReflectionUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.lang.reflect.Field;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ReflectionUtil {
+
+ /**
+ * Get field from class.
+ *
+ * @param targetClass target class
+ * @param fieldName field name
+ * @param isDeclared is declared
+ * @return {@link Field}
+ * @throws NoSuchFieldException no such field exception
+ */
+ public static Field getFieldFromClass(final Class<?> targetClass, final String fieldName, final boolean isDeclared) throws NoSuchFieldException {
+ Field targetField;
+ if (isDeclared) {
+ targetField = targetClass.getDeclaredField(fieldName);
+ } else {
+ targetField = targetClass.getField(fieldName);
+ }
+ targetField.setAccessible(true);
+ return targetField;
+ }
+
+ /**
+ * Get field value from instance target object.
+ *
+ * @param target target object
+ * @param fieldName field name
+ * @param valueClass expected value class
+ * @param <T> expected value class
+ * @return target filed value
+ * @throws NoSuchFieldException no such field exception
+ * @throws IllegalAccessException illegal access exception
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getFieldValueFromClass(final Object target, final String fieldName, final Class<T> valueClass) throws NoSuchFieldException, IllegalAccessException {
+ Field field = getFieldFromClass(target.getClass(), fieldName, true);
+ Object value = field.get(target);
+ if (null == value) {
+ return null;
+ }
+ if (value.getClass().isAssignableFrom(value.getClass())) {
+ return (T) value;
+ }
+ throw new ClassCastException("field " + fieldName + " is " + target.getClass().getName() + " can cast to " + valueClass.getName());
+ }
+
+ /**
+ * Set value to target object field.
+ *
+ * @param target target object
+ * @param fieldName field name
+ * @param value new value
+ * @throws NoSuchFieldException no such field exception
+ * @throws IllegalAccessException illegal access exception
+ */
+ public static void setFieldValueToClass(final Object target, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
+ Field field = getFieldFromClass(target.getClass(), fieldName, true);
+ field.set(target, value);
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
new file mode 100644
index 0000000..42cd7f0
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.postgresql.PGConnection;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.replication.PGReplicationConnection;
+import org.postgresql.replication.fluent.ChainedStreamBuilder;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class LogicalReplicationTest {
+
+ @Mock
+ private PgConnection pgConnection;
+
+ @Mock
+ private PGReplicationConnection pgReplicationConnection;
+
+ @Mock
+ private ChainedStreamBuilder chainedStreamBuilder;
+
+ @Mock
+ private ChainedLogicalStreamBuilder chainedLogicalStreamBuilder;
+
+ private LogicalReplication logicalReplication;
+
+ @Before
+ public void setUp() {
+ logicalReplication = new LogicalReplication();
+ }
+
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertCreatePgConnectionSuccess() {
+ Connection pgConnection = logicalReplication.createPgConnection(
+ new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+ assertFalse(pgConnection.isClosed());
+ }
+
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertCreateReplicationStreamSuccess() {
+ LogSequenceNumber startPosition = LogSequenceNumber.valueOf(100L);
+ when(pgConnection.unwrap(PGConnection.class)).thenReturn(pgConnection);
+ when(pgConnection.getReplicationAPI()).thenReturn(pgReplicationConnection);
+ when(pgReplicationConnection.replicationStream()).thenReturn(chainedStreamBuilder);
+ when(chainedStreamBuilder.logical()).thenReturn(chainedLogicalStreamBuilder);
+ when(chainedLogicalStreamBuilder.withStartPosition(startPosition)).thenReturn(chainedLogicalStreamBuilder);
+ when(chainedLogicalStreamBuilder.withSlotName("")).thenReturn(chainedLogicalStreamBuilder);
+ when(chainedLogicalStreamBuilder.withSlotOption(anyString(), eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
+ logicalReplication.createReplicationStream(pgConnection, "", startPosition);
+ verify(chainedLogicalStreamBuilder).start();
+ }
+
+ @Test(expected = SQLException.class)
+ @SneakyThrows(SQLException.class)
+ public void assertCreateReplicationStreamFailure() {
+ when(pgConnection.unwrap(PGConnection.class)).thenThrow(new SQLException());
+ logicalReplication.createReplicationStream(pgConnection, "", LogSequenceNumber.valueOf(100L));
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverterTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverterTest.java
new file mode 100644
index 0000000..10d99b1
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class WalEventConverterTest {
+
+ private WalEventConverter walEventConverter;
+
+ @Before
+ public void setUp() {
+ DumperConfiguration dumperConfiguration = mockDumperConfiguration();
+ walEventConverter = new WalEventConverter(dumperConfiguration);
+ initTableData(dumperConfiguration);
+ }
+
+ private DumperConfiguration mockDumperConfiguration() {
+ DumperConfiguration reslut = new DumperConfiguration();
+ reslut.setDataSourceConfiguration(new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
+ Map<String, String> tableNameMap = Maps.newHashMap();
+ tableNameMap.put("t_order", "t_order");
+ reslut.setTableNameMap(tableNameMap);
+ return reslut;
+ }
+
+ @SneakyThrows(SQLException.class)
+ private void initTableData(final DumperConfiguration dumperConfig) {
+ DataSource dataSource = new DataSourceManager().getDataSource(dumperConfig.getDataSourceConfiguration());
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE IF EXISTS t_order");
+ statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
+ statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ }
+ }
+
+ @Test
+ public void assertConvertWriteRowEvent() {
+ Record record = walEventConverter.convert(mockWriteRowEvent());
+ assertTrue(record instanceof DataRecord);
+ assertThat(((DataRecord) record).getType(), is(ScalingConstant.INSERT));
+ }
+
+ @Test
+ public void assertConvertUpdateRowEvent() {
+ Record record = walEventConverter.convert(mockUpdateRowEvent());
+ assertTrue(record instanceof DataRecord);
+ assertThat(((DataRecord) record).getType(), is(ScalingConstant.UPDATE));
+ }
+
+ @Test
+ public void assertConvertDeleteRowEvent() {
+ Record record = walEventConverter.convert(mockDeleteRowEvent());
+ assertTrue(record instanceof DataRecord);
+ assertThat(((DataRecord) record).getType(), is(ScalingConstant.DELETE));
+ }
+
+ @Test
+ public void assertConvertPlaceholderEvent() {
+ Record record = walEventConverter.convert(new PlaceholderEvent());
+ assertTrue(record instanceof PlaceholderRecord);
+ }
+
+ @Test
+ public void assertUnknownTable() {
+ Record record = walEventConverter.convert(mockUnknownTableEvent());
+ assertTrue(record instanceof PlaceholderRecord);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void assertConvertFailure() {
+ walEventConverter.convert(new AbstractRowEvent());
+ }
+
+ private AbstractRowEvent mockWriteRowEvent() {
+ WriteRowEvent result = new WriteRowEvent();
+ result.setSchemaName("");
+ result.setTableName("t_order");
+ result.setAfterRow(Lists.newArrayList("id", "user_id"));
+ return result;
+ }
+
+ private AbstractRowEvent mockUpdateRowEvent() {
+ UpdateRowEvent result = new UpdateRowEvent();
+ result.setSchemaName("");
+ result.setTableName("t_order");
+ result.setAfterRow(Lists.newArrayList("id", "user_id"));
+ return result;
+ }
+
+ private AbstractRowEvent mockDeleteRowEvent() {
+ DeleteRowEvent result = new DeleteRowEvent();
+ result.setSchemaName("");
+ result.setTableName("t_order");
+ result.setPrimaryKeys(Lists.newArrayList("id"));
+ return result;
+ }
+
+ private AbstractRowEvent mockUnknownTableEvent() {
+ WriteRowEvent result = new WriteRowEvent();
+ result.setSchemaName("");
+ result.setTableName("t_other");
+ return result;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
new file mode 100644
index 0000000..b6c8a1d
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal;
+
+import org.junit.Test;
+import org.postgresql.replication.LogSequenceNumber;
+
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+
+public final class WalPositionTest {
+
+ @Test
+ public void assertCompareTo() {
+ WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
+ assertThat(walPosition.compareTo(null), is(1));
+ assertThat(walPosition.compareTo(new WalPosition(LogSequenceNumber.valueOf(100L))), is(0));
+ }
+
+ @Test
+ public void assertToJson() {
+ WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
+ assertThat(walPosition.toJson().toString(), is("100"));
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
index 6bd6966..eb800bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TestDecodingPluginTest.java
@@ -17,55 +17,86 @@
package org.apache.shardingsphere.scaling.postgresql.wal.decode;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
import org.junit.Test;
+import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.replication.LogSequenceNumber;
+
import java.nio.ByteBuffer;
+import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public final class TestDecodingPluginTest {
+ private LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
+
@Test
public void assertDecodeWriteRowEvent() {
- LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[character varying]:'1 2 3'''".getBytes());
- WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
- assertThat(actual.getLogSequenceNumber(), is(lsn));
+ WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
}
@Test
public void assertDecodeUpdateRowEvent() {
- LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: data[character varying]:'1 2 3'''".getBytes());
- UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
- assertThat(actual.getLogSequenceNumber(), is(lsn));
+ UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
}
@Test
public void assertDecodeDeleteRowEvent() {
- LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
- DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
- assertThat(actual.getLogSequenceNumber(), is(lsn));
+ DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getPrimaryKeys().get(0), is(1));
}
@Test
public void assertDecodeWriteRowEventWithByteA() {
- LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes());
- WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
- assertThat(actual.getLogSequenceNumber(), is(lsn));
+ WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
- assertThat(actual.getAfterRow().get(0), is(new byte[] {(byte) 0xff, (byte) 0, (byte) 0xab}));
+ assertThat(actual.getAfterRow().get(0), is(new byte[]{(byte) 0xff, (byte) 0, (byte) 0xab}));
+ }
+
+ @Test
+ public void assertDecodeUnknownTableType() {
+ ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
+ AbstractWalEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertTrue(actual instanceof PlaceholderEvent);
+ }
+
+ @Test(expected = SyncTaskExecuteException.class)
+ public void assertDecodeUnknownRowEventType() {
+ ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes());
+ new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ }
+
+ @Test(expected = DecodingException.class)
+ @SneakyThrows(SQLException.class)
+ public void assertDecodeTime() {
+ TimestampUtils timestampUtils = mock(TimestampUtils.class);
+ when(timestampUtils.toTime(eq(null), eq("1 2 3'"))).thenThrow(new SQLException());
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes());
+ new TestDecodingPlugin(timestampUtils).decode(data, logSequenceNumber);
}
}