You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/07 12:45:17 UTC

[shardingsphere] branch master updated: Drop PostgreSQL replication slot when scaling task finished (#12267)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 28893b5  Drop PostgreSQL replication slot when scaling task finished (#12267)
28893b5 is described below

commit 28893b5c3cd8b635063646b757fe037406f79418
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Sep 7 20:44:48 2021 +0800

    Drop PostgreSQL replication slot when scaling task finished (#12267)
    
    * Add destroy method for PositionInitializer
    
    * Drop slot when scaling task finished
    
    * Stop replication before drop slot
---
 .../scaling/core/job/FinishedCheckJob.java         |  4 ++-
 .../scaling/core/job/JobContext.java               |  3 ++
 .../shardingsphere/scaling/core/job/JobStatus.java |  5 +++
 .../scaling/core/job/ScalingJob.java               |  1 +
 .../core/job/position/PositionInitializer.java     |  9 ++++++
 .../core/job/preparer/ScalingJobPreparer.java      | 17 ++++++++++
 .../scaling/core/job/schedule/JobScheduler.java    |  9 ++++++
 .../core/job/schedule/JobSchedulerCenter.java      | 29 ++++++++++-------
 .../component/PostgreSQLPositionInitializer.java   | 37 ++++++++++++++++++++++
 .../postgresql/component/PostgreSQLWalDumper.java  |  5 ++-
 .../PostgreSQLPositionInitializerTest.java         | 22 +++++++++++++
 11 files changed, 126 insertions(+), 15 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index b4db0ab..249232c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
 import org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.scaling.core.job.schedule.JobSchedulerCenter;
 import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
 
 import java.util.List;
@@ -74,10 +75,11 @@ public final class FinishedCheckJob implements SimpleJob {
         } else {
             log.info("dataConsistencyCheckAlgorithm is not configured, data consistency check will be ignored.");
         }
-        scalingAPI.stop(jobId);
+        JobSchedulerCenter.getJobContext(jobId).ifPresent(jobContext -> jobContext.setStatus(JobStatus.ALMOST_FINISHED));
         ScalingDataSourceConfigurationWrap targetConfig = jobConfig.getRuleConfig().getTarget();
         ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), targetConfig.getParameter());
         ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+        scalingAPI.stop(jobId);
     }
     
     private boolean dataConsistencyCheck(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
index 540babb..880db2f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
 import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
@@ -52,6 +53,8 @@ public final class JobContext {
     
     private JobConfiguration jobConfig;
     
+    private ScalingJobPreparer jobPreparer;
+    
     public JobContext(final JobConfiguration jobConfig) {
         this.jobConfig = jobConfig;
         JobConfigurationUtil.fillInProperties(jobConfig);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
index 2ef07cf..68d7bab 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobStatus.java
@@ -48,6 +48,11 @@ public enum JobStatus {
     EXECUTE_INCREMENTAL_TASK(true),
     
     /**
+     * Job is almost finished.
+     */
+    ALMOST_FINISHED(true),
+    
+    /**
      * Job is stopping.
      */
     STOPPING(true),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index 02e8891..a7c9bd5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -44,6 +44,7 @@ public final class ScalingJob implements SimpleJob {
         jobConfig.getHandleConfig().setShardingItem(shardingContext.getShardingItem());
         JobContext jobContext = new JobContext(jobConfig);
         jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
+        jobContext.setJobPreparer(jobPreparer);
         jobPreparer.prepare(jobContext);
         governanceRepositoryAPI.persistJobProgress(jobContext);
         JobSchedulerCenter.start(jobContext);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
index 30865c8..d9fa7f5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
@@ -41,4 +41,13 @@ public interface PositionInitializer {
      * @return position
      */
     ScalingPosition<?> init(String data);
+    
+    /**
+     * Clean up by data source if necessary.
+     *
+     * @param dataSource data source
+     * @throws SQLException SQL exception
+     */
+    default void destroy(DataSource dataSource) throws SQLException {
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 9120a54..18a36eb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.scaling.core.job.JobContext;
 import org.apache.shardingsphere.scaling.core.job.JobStatus;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
 import org.apache.shardingsphere.scaling.core.job.check.source.DataSourceChecker;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
 import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
 import org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter;
@@ -99,4 +100,20 @@ public final class ScalingJobPreparer {
         }
         return PositionInitializerFactory.newInstance(taskConfig.getHandleConfig().getDatabaseType()).init(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
     }
+    
+    /**
+     * Do cleanup work for scaling job.
+     *
+     * @param jobContext job context
+     */
+    public void cleanup(final JobContext jobContext) {
+        try (DataSourceManager dataSourceManager = new DataSourceManager(jobContext.getTaskConfigs())) {
+            for (TaskConfiguration each : jobContext.getTaskConfigs()) {
+                PositionInitializer positionInitializer = PositionInitializerFactory.newInstance(each.getHandleConfig().getDatabaseType());
+                positionInitializer.destroy(dataSourceManager.getDataSource(each.getDumperConfig().getDataSourceConfig()));
+            }
+        } catch (final SQLException ex) {
+            log.warn("Scaling job destroying failed", ex);
+        }
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
index 0dd86c8..376214338 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobScheduler.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.executor.engine.ExecuteCallback;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
 import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
 
@@ -49,6 +50,7 @@ public final class JobScheduler implements Runnable {
      */
     public void stop() {
         log.info("stop scaling job {}", jobContext.getJobId());
+        final boolean almostFinished = jobContext.getStatus() == JobStatus.ALMOST_FINISHED;
         if (jobContext.getStatus().isRunning()) {
             jobContext.setStatus(JobStatus.STOPPING);
         }
@@ -60,6 +62,13 @@ public final class JobScheduler implements Runnable {
             log.info("stop incremental task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
         }
+        if (almostFinished) {
+            log.info("almost finished, preparer cleanup, job {}", jobContext.getJobId());
+            ScalingJobPreparer jobPreparer = jobContext.getJobPreparer();
+            if (null != jobPreparer) {
+                jobPreparer.cleanup(jobContext);
+            }
+        }
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
index c05f73a..4f2dd25 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/schedule/JobSchedulerCenter.java
@@ -26,9 +26,9 @@ import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
 
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public final class JobSchedulerCenter {
     
-    private static final Map<String, JobScheduler> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
+    private static final Map<Long, JobScheduler> JOB_SCHEDULER_MAP = Maps.newConcurrentMap();
     
     private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
     
@@ -56,7 +56,7 @@ public final class JobSchedulerCenter {
      * @param jobContext job context
      */
     public static void start(final JobContext jobContext) {
-        String key = String.format("%d-%d", jobContext.getJobId(), jobContext.getShardingItem());
+        Long key = jobContext.getJobId();
         if (JOB_SCHEDULER_MAP.containsKey(key)) {
             return;
         }
@@ -71,21 +71,28 @@ public final class JobSchedulerCenter {
      * @param jobId job id
      */
     public static void stop(final long jobId) {
-        Iterator<Entry<String, JobScheduler>> iterator = JOB_SCHEDULER_MAP.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Entry<String, JobScheduler> entry = iterator.next();
-            if (entry.getKey().startsWith(String.format("%d-", jobId))) {
-                entry.getValue().stop();
-                iterator.remove();
-            }
+        JobScheduler jobScheduler = JOB_SCHEDULER_MAP.remove(jobId);
+        if (null != jobScheduler) {
+            jobScheduler.stop();
         }
     }
     
+    /**
+     * Get job context.
+     *
+     * @param jobId job id
+     * @return job context
+     */
+    public static Optional<JobContext> getJobContext(final long jobId) {
+        JobScheduler jobScheduler = JOB_SCHEDULER_MAP.get(jobId);
+        return Optional.ofNullable(null != jobScheduler ? jobScheduler.getJobContext() : null);
+    }
+    
     private static final class PersistJobContextRunnable implements Runnable {
         
         @Override
         public void run() {
-            for (Entry<String, JobScheduler> entry : JOB_SCHEDULER_MAP.entrySet()) {
+            for (Entry<Long, JobScheduler> entry : JOB_SCHEDULER_MAP.entrySet()) {
                 try {
                     REGISTRY_REPOSITORY_API.persistJobProgress(entry.getValue().getJobContext());
                     // CHECKSTYLE:OFF
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index 457bdc3..d55fcb1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.postgresql.component;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
 import org.postgresql.replication.LogSequenceNumber;
@@ -31,6 +32,7 @@ import java.sql.SQLException;
 /**
  * PostgreSQL wal position initializer.
  */
+@Slf4j
 public final class PostgreSQLPositionInitializer implements PositionInitializer {
     
     public static final String SLOT_NAME = "sharding_scaling";
@@ -53,6 +55,10 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     }
     
     private void createIfNotExists(final Connection connection) throws SQLException {
+        if (checkSlotExistsOrNot(connection)) {
+            log.info("replication slot already exist, slot name: {}", SLOT_NAME);
+            return;
+        }
         try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN))) {
             ps.execute();
         } catch (final PSQLException ex) {
@@ -62,6 +68,17 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
         }
     }
     
+    private boolean checkSlotExistsOrNot(final Connection connection) throws SQLException {
+        String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
+        try (PreparedStatement preparedStatement = connection.prepareStatement(checkSlotSQL)) {
+            preparedStatement.setString(1, SLOT_NAME);
+            preparedStatement.setString(2, DECODE_PLUGIN);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                return resultSet.next();
+            }
+        }
+    }
+    
     private WalPosition getWalPosition(final Connection connection) throws SQLException {
         try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
              ResultSet rs = ps.executeQuery()) {
@@ -79,4 +96,24 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
         }
         throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
     }
+    
+    @Override
+    public void destroy(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            dropSlotIfExists(connection);
+        }
+    }
+    
+    private void dropSlotIfExists(final Connection connection) throws SQLException {
+        if (!checkSlotExistsOrNot(connection)) {
+            log.info("drop, slot not exist, slot name: {}", SLOT_NAME);
+            return;
+        }
+        log.info("drop, slot exist, slot name: {}", SLOT_NAME);
+        String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
+        try (PreparedStatement preparedStatement = connection.prepareStatement(dropSlotSQL)) {
+            preparedStatement.setString(1, SLOT_NAME);
+            preparedStatement.execute();
+        }
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
index 654f3f3..98857d6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
@@ -75,10 +75,9 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
     }
     
     private void dump() {
-        try {
-            Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
+        try (Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
+             PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) {
             DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
-            PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber());
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
index 8b0e9a7..67c956b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
@@ -36,6 +36,7 @@ import java.sql.SQLException;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -67,6 +68,7 @@ public final class PostgreSQLPositionInitializerTest {
     
     @Test
     public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
+        mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
         WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
@@ -75,6 +77,7 @@ public final class PostgreSQLPositionInitializerTest {
     
     @Test
     public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
+        mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
         WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
         assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
@@ -82,6 +85,7 @@ public final class PostgreSQLPositionInitializerTest {
     
     @Test(expected = RuntimeException.class)
     public void assertGetCurrentPositionThrowException() throws SQLException {
+        mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
         new PostgreSQLPositionInitializer().init(dataSource);
@@ -106,4 +110,22 @@ public final class PostgreSQLPositionInitializerTest {
         when(resultSet.getString(1)).thenReturn(POSTGRESQL_10_LSN);
         return result;
     }
+    
+    @SneakyThrows(SQLException.class)
+    private void mockSlotExistsOrNot(final boolean exists) {
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        when(connection.prepareStatement("SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?")).thenReturn(preparedStatement);
+        ResultSet resultSet = mock(ResultSet.class);
+        when(preparedStatement.executeQuery()).thenReturn(resultSet);
+        when(resultSet.next()).thenReturn(exists);
+    }
+    
+    @Test
+    public void assertDestroyWhenSlotExists() throws SQLException {
+        mockSlotExistsOrNot(true);
+        PreparedStatement preparedStatement = mock(PreparedStatement.class);
+        when(connection.prepareStatement("SELECT pg_drop_replication_slot(?)")).thenReturn(preparedStatement);
+        new PostgreSQLPositionInitializer().destroy(dataSource);
+        verify(preparedStatement).execute();
+    }
 }