You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/07 12:45:17 UTC
[shardingsphere] branch master updated: Drop PostgreSQL replication
slot when scaling task finished (#12267)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 28893b5 Drop PostgreSQL replication slot when scaling task finished (#12267)
28893b5 is described below
commit 28893b5c3cd8b635063646b757fe037406f79418
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Sep 7 20:44:48 2021 +0800
Drop PostgreSQL replication slot when scaling task finished (#12267)
* Add destroy method for PositionInitializer
* Drop slot when scaling task finished
* Stop replication before drop slot
---
.../scaling/core/job/FinishedCheckJob.java | 4 ++-
.../scaling/core/job/JobContext.java | 3 ++
.../shardingsphere/scaling/core/job/JobStatus.java | 5 +++
.../scaling/core/job/ScalingJob.java | 1 +
.../core/job/position/PositionInitializer.java | 9 ++++++
.../core/job/preparer/ScalingJobPreparer.java | 17 ++++++++++
.../scaling/core/job/schedule/JobScheduler.java | 9 ++++++
.../core/job/schedule/JobSchedulerCenter.java | 29 ++++++++++-------
.../component/PostgreSQLPositionInitializer.java | 37 ++++++++++++++++++++++
.../postgresql/component/PostgreSQLWalDumper.java | 5 ++-
.../PostgreSQLPositionInitializerTest.java | 22 +++++++++++++
11 files changed, 126 insertions(+), 15 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index b4db0ab..249232c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.scaling.core.job.schedule.JobSchedulerCenter;
import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
import java.util.List;
@@ -74,10 +75,11 @@ public final class FinishedCheckJob implements SimpleJob {
} else {
log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check will be ignored.");
}
- scalingAPI.stop(jobId);
+ JobSchedulerCenter.getJobContext(jobId).ifPresent(jobContext -> jobContext.setStatus(JobStatus.ALMOST_FINISHED));
ScalingDataSourceConfigurationWrap targetConfig = jobConfig.getRuleConfig().getTarget();
ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), targetConfig.getParameter());
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+ scalingAPI.stop(jobId);
}
private boolean dataConsistencyCheck(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
index 540babb..880db2f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
@@ -52,6 +53,8 @@ public final class JobContext {
private JobConfiguration jobConfig;
+ private ScalingJobPreparer jobPreparer;
+
public JobContext(final JobConfiguration jobConfig) {
this.jobConfig = jobConfig;
JobConfigurationUtil.fillInProperties(jobConfig);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
index 2ef07cf..68d7bab 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
@@ -48,6 +48,11 @@ public enum JobStatus {
EXECUTE_INCREMENTAL_TASK(true),
/**
+ * Job is almost finished.
+ */
+ ALMOST_FINISHED(true),
+
+ /**
* Job is stopping.
*/
STOPPING(true),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index 02e8891..a7c9bd5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -44,6 +44,7 @@ public final class ScalingJob implements SimpleJob {
jobConfig.getHandleConfig().setShardingItem(shardingContext.getShardingItem());
JobContext jobContext = new JobContext(jobConfig);
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
+ jobContext.setJobPreparer(jobPreparer);
jobPreparer.prepare(jobContext);
governanceRepositoryAPI.persistJobProgress(jobContext);
JobSchedulerCenter.start(jobContext);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
index 30865c8..d9fa7f5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
@@ -41,4 +41,13 @@ public interface PositionInitializer {
* @return position
*/
ScalingPosition<?> init(String data);
+
+ /**
+ * Clean up by data source if necessary.
+ *
+ * @param dataSource data source
+ * @throws SQLException SQL exception
+ */
+ default void destroy(DataSource dataSource) throws SQLException {
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 9120a54..18a36eb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.JobStatus;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.check.source.DataSourceChecker;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
import org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter;
@@ -99,4 +100,20 @@ public final class ScalingJobPreparer {
}
return PositionInitializerFactory.newInstance(taskConfig.getHandleConfig().getDatabaseType()).init(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
}
+
+ /**
+ * Do cleanup work for scaling job.
+ *
+ * @param jobContext job context
+ */
+ public void cleanup(final JobContext jobContext) {
+ try (DataSourceManager dataSourceManager = new DataSourceManager(jobContext.getTaskConfigs())) {
+ for (TaskConfiguration each : jobContext.getTaskConfigs()) {
+ PositionInitializer positionInitializer = PositionInitializerFactory.newInstance(each.getHandleConfig().getDatabaseType());
+ positionInitializer.destroy(dataSourceManager.getDataSource(each.getDumperConfig().getDataSourceConfig()));
+ }
+ } catch (final SQLException ex) {
+ log.warn("Scaling job destroying failed", ex);
+ }
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
index 0dd86c8..376214338 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.executor.engine.ExecuteCallback;
import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
@@ -49,6 +50,7 @@ public final class JobScheduler implements Runnable {
*/
public void stop() {
log.info("stop scaling job {}", jobContext.getJobId());
+ final boolean almostFinished = jobContext.getStatus() == JobStatus.ALMOST_FINISHED;
if (jobContext.getStatus().isRunning()) {
jobContext.setStatus(JobStatus.STOPPING);
}
@@ -60,6 +62,13 @@ public final class JobScheduler implements Runnable {
log.info("stop incremental task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
}
+ if (almostFinished) {
+ log.info("almost finished, preparer cleanup, job {}", jobContext.getJobId());
+ ScalingJobPreparer jobPreparer = jobContext.getJobPreparer();
+ if (null != jobPreparer) {
+ jobPreparer.cleanup(jobContext);
+ }
+ }
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
index c05f73a..4f2dd25 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
@@ -26,9 +26,9 @@ import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
import org.apache.shardingsphere.scaling.core.job.JobContext;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class JobSchedulerCenter {
- private static final Map<String, JobScheduler> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
+ private static final Map<Long, JobScheduler> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
@@ -56,7 +56,7 @@ public final class JobSchedulerCenter {
* @param jobContext job context
*/
public static void start(final JobContext jobContext) {
- String key = String.format("%d-%d", jobContext.getJobId(), jobContext.getShardingItem());
+ Long key = jobContext.getJobId();
if (JOB_SCHEDULER_MAP.containsKey(key)) {
return;
}
@@ -71,21 +71,28 @@ public final class JobSchedulerCenter {
* @param jobId job id
*/
public static void stop(final long jobId) {
- Iterator<Entry<String, JobScheduler>> iterator = JOB_SCHEDULER_MAP.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, JobScheduler> entry = iterator.next();
- if (entry.getKey().startsWith(String.format("%d-", jobId))) {
- entry.getValue().stop();
- iterator.remove();
- }
+ JobScheduler jobScheduler = JOB_SCHEDULER_MAP.remove(jobId);
+ if (null != jobScheduler) {
+ jobScheduler.stop();
}
}
+ /**
+ * Get job context.
+ *
+ * @param jobId job id
+ * @return job context
+ */
+ public static Optional<JobContext> getJobContext(final long jobId) {
+ JobScheduler jobScheduler = JOB_SCHEDULER_MAP.get(jobId);
+ return Optional.ofNullable(null != jobScheduler ? jobScheduler.getJobContext() : null);
+ }
+
private static final class PersistJobContextRunnable implements Runnable {
@Override
public void run() {
- for (Entry<String, JobScheduler> entry : JOB_SCHEDULER_MAP.entrySet()) {
+ for (Entry<Long, JobScheduler> entry : JOB_SCHEDULER_MAP.entrySet()) {
try {
REGISTRY_REPOSITORY_API.persistJobProgress(entry.getValue().getJobContext());
// CHECKSTYLE:OFF
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index 457bdc3..d55fcb1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql.component;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.postgresql.replication.LogSequenceNumber;
@@ -31,6 +32,7 @@ import java.sql.SQLException;
/**
* PostgreSQL wal position initializer.
*/
+@Slf4j
public final class PostgreSQLPositionInitializer implements PositionInitializer {
public static final String SLOT_NAME = "sharding_scaling";
@@ -53,6 +55,10 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
private void createIfNotExists(final Connection connection) throws SQLException {
+ if (checkSlotExistsOrNot(connection)) {
+ log.info("replication slot already exist, slot name: {}", SLOT_NAME);
+ return;
+ }
try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN))) {
ps.execute();
} catch (final PSQLException ex) {
@@ -62,6 +68,17 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
}
+ private boolean checkSlotExistsOrNot(final Connection connection) throws SQLException {
+ String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
+ try (PreparedStatement preparedStatement = connection.prepareStatement(checkSlotSQL)) {
+ preparedStatement.setString(1, SLOT_NAME);
+ preparedStatement.setString(2, DECODE_PLUGIN);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.next();
+ }
+ }
+ }
+
private WalPosition getWalPosition(final Connection connection) throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
ResultSet rs = ps.executeQuery()) {
@@ -79,4 +96,24 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
}
+
+ @Override
+ public void destroy(final DataSource dataSource) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ dropSlotIfExists(connection);
+ }
+ }
+
+ private void dropSlotIfExists(final Connection connection) throws SQLException {
+ if (!checkSlotExistsOrNot(connection)) {
+ log.info("drop, slot not exist, slot name: {}", SLOT_NAME);
+ return;
+ }
+ log.info("drop, slot exist, slot name: {}", SLOT_NAME);
+ String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
+ try (PreparedStatement preparedStatement = connection.prepareStatement(dropSlotSQL)) {
+ preparedStatement.setString(1, SLOT_NAME);
+ preparedStatement.execute();
+ }
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
index 654f3f3..98857d6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
@@ -75,10 +75,9 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
}
private void dump() {
- try {
- Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
+ try (Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
+ PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) {
DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
- PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber());
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
index 8b0e9a7..67c956b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
@@ -36,6 +36,7 @@ import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -67,6 +68,7 @@ public final class PostgreSQLPositionInitializerTest {
@Test
public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
+ mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
@@ -75,6 +77,7 @@ public final class PostgreSQLPositionInitializerTest {
@Test
public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
+ mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
@@ -82,6 +85,7 @@ public final class PostgreSQLPositionInitializerTest {
@Test(expected = RuntimeException.class)
public void assertGetCurrentPositionThrowException() throws SQLException {
+ mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
new PostgreSQLPositionInitializer().init(dataSource);
@@ -106,4 +110,22 @@ public final class PostgreSQLPositionInitializerTest {
when(resultSet.getString(1)).thenReturn(POSTGRESQL_10_LSN);
return result;
}
+
+ @SneakyThrows(SQLException.class)
+ private void mockSlotExistsOrNot(final boolean exists) {
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ when(connection.prepareStatement("SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?")).thenReturn(preparedStatement);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(preparedStatement.executeQuery()).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(exists);
+ }
+
+ @Test
+ public void assertDestroyWhenSlotExists() throws SQLException {
+ mockSlotExistsOrNot(true);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ when(connection.prepareStatement("SELECT pg_drop_replication_slot(?)")).thenReturn(preparedStatement);
+ new PostgreSQLPositionInitializer().destroy(dataSource);
+ verify(preparedStatement).execute();
+ }
}