You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/12 02:50:29 UTC

[shardingsphere] branch master updated: Refactor replication slot cleanup for openGauss scaling job (#16746)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b53353ef41 Refactor replication slot cleanup for openGauss scaling job (#16746)
2b53353ef41 is described below

commit 2b53353ef4179ffeb5e6a36ca8b44858f7773a97
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Apr 12 10:50:21 2022 +0800

    Refactor replication slot cleanup for openGauss scaling job (#16746)
    
    * Refactor replication slot cleanup
    
    * Add TODO
    
    * Refactor cleanup, clean all source data sources
    
    * Fix code concurrent modification conflict
---
 .../pool/creator/DataSourcePoolCreator.java        |  4 ++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  4 +---
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 15 ++----------
 .../pipeline/core/execute/PipelineJobExecutor.java | 16 ++++++++++++-
 .../rulealtered/RuleAlteredJobPreparer.java        | 28 +++++++++++++++++-----
 .../RuleAlteredJobProgressDetector.java            | 13 ++++++++++
 .../rulealtered/RuleAlteredJobSchedulerCenter.java |  4 +++-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  1 +
 .../ingest/OpenGaussPositionInitializer.java       |  1 +
 .../ingest/wal/OpenGaussLogicalReplication.java    | 17 +++++++++----
 .../ingest/PostgreSQLPositionInitializer.java      |  1 +
 .../subscriber/ScalingRegistrySubscriber.java      |  3 ---
 .../handler/CuratorZookeeperExceptionHandler.java  |  2 ++
 13 files changed, 78 insertions(+), 31 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java
index 594bea819b1..3fae4695b34 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/datasource/pool/creator/DataSourcePoolCreator.java
@@ -40,6 +40,9 @@ import java.util.stream.Collectors;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class DataSourcePoolCreator {
     
+    // TODO pipeline doesn't need cache even if cache is enabled, since there might be some temp data sources
+    // TODO when all data source configurations of instance are dropped by DistSQL, cached data source should be closed
+    
     /**
      * Create data sources.
      *
@@ -62,6 +65,7 @@ public final class DataSourcePoolCreator {
                 return GlobalDataSourceRegistry.getInstance().getCachedDataSources().get(dataSourceProps.getInstance());
             }
         }
+        // TODO when aggregation is enabled, some data source properties should be changed, e.g. maxPoolSize
         DataSource result = createDataSource(dataSourceProps.getDataSourceClassName());
         Optional<DataSourcePoolMetaData> poolMetaData = DataSourcePoolMetaDataFactory.newInstance(dataSourceProps.getDataSourceClassName());
         DataSourceReflection dataSourceReflection = new DataSourceReflection(result);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 1867c5bcb98..549c3b7d259 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -51,9 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     public void stop(final String jobId) {
         log.info("Stop pipeline job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        if (jobConfigPOJO.isDisabled()) {
-            return;
-        }
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
         PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
@@ -64,6 +61,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         log.info("Remove pipeline job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         verifyJobStopped(jobConfigPOJO);
+        // TODO release lock
         PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
         PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index c65784f106e..370265c5dea 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -37,9 +37,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecuti
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -67,7 +66,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -361,17 +359,8 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         for (int each : repositoryAPI.getShardingItems(jobId)) {
             repositoryAPI.updateShardingJobStatus(jobId, each, JobStatus.FINISHED);
         }
+        RuleAlteredJobSchedulerCenter.stop(jobId);
         stop(jobId);
-        // TODO clean up should be done after the task is complete.
-        try {
-            TimeUnit.SECONDS.sleep(1L);
-        } catch (final InterruptedException ex) {
-            log.error(ex.getMessage());
-        }
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
-        RuleAlteredJobPreparer jobPreparer = new RuleAlteredJobPreparer();
-        jobPreparer.cleanup(jobContext);
-        jobContext.close();
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 898528c1ee6..c1acd9ca7fa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,12 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -49,15 +53,25 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     }
     
     private void watchGovernanceRepositoryConfiguration() {
+        RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
         PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, event -> {
             Optional<JobConfigurationPOJO> jobConfigPOJOOptional = getJobConfigPOJO(event);
             if (!jobConfigPOJOOptional.isPresent()) {
                 return;
             }
             JobConfigurationPOJO jobConfigPOJO = jobConfigPOJOOptional.get();
-            if (DataChangedEvent.Type.DELETED == event.getType() || jobConfigPOJO.isDisabled()) {
+            boolean deleted = DataChangedEvent.Type.DELETED == event.getType();
+            boolean disabled = jobConfigPOJO.isDisabled();
+            if (deleted || disabled) {
+                log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled);
                 RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
                 JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
+                if (deleted) {
+                    new RuleAlteredJobPreparer().cleanup(jobConfig);
+                } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getHandleConfig().getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
+                    log.info("isJobSuccessful=true");
+                    new RuleAlteredJobPreparer().cleanup(jobConfig);
+                }
                 ScalingReleaseSchemaNameLockEvent releaseLockEvent = new ScalingReleaseSchemaNameLockEvent(jobConfig.getWorkflowConfig().getSchemaName());
                 ShardingSphereEventBus.getInstance().post(releaseLockEvent);
                 return;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index a753250a628..f66d063e163 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -39,6 +41,11 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.Inve
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
@@ -183,16 +190,25 @@ public final class RuleAlteredJobPreparer {
     /**
      * Do cleanup work for scaling job.
      *
-     * @param jobContext job context
+     * @param jobConfig job configuration
      */
-    public void cleanup(final RuleAlteredJobContext jobContext) {
-        PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
+    public void cleanup(final JobConfiguration jobConfig) {
         try {
-            TaskConfiguration taskConfig = jobContext.getTaskConfig();
-            PositionInitializer positionInitializer = PositionInitializerFactory.getPositionInitializer(taskConfig.getHandleConfig().getSourceDatabaseType());
-            positionInitializer.destroy(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
+            cleanup0(jobConfig);
         } catch (final SQLException ex) {
             log.warn("Scaling job destroying failed", ex);
         }
     }
+    
+    private void cleanup0(final JobConfiguration jobConfig) throws SQLException {
+        DatabaseType databaseType = DatabaseTypeRegistry.getActualDatabaseType(jobConfig.getHandleConfig().getSourceDatabaseType());
+        PositionInitializer positionInitializer = PositionInitializerFactory.getPositionInitializer(databaseType.getName());
+        ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration)
+                PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
+        for (DataSourceProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceDataSourceConfig.getRootConfig()).values()) {
+            try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
+                positionInitializer.destroy(dataSource);
+            }
+        }
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
index 4048df2e970..15deded91ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
@@ -57,4 +58,16 @@ public final class RuleAlteredJobProgressDetector {
         return jobShardingCount == jobProgresses.size()
                 && jobProgresses.stream().allMatch(each -> null != each && !each.getStatus().isRunning());
     }
+    
+    /**
+     * Whether job is successful.
+     *
+     * @param jobShardingCount job sharding count
+     * @param jobProgresses job progresses
+     * @return completed or not
+     */
+    public static boolean isJobSuccessful(final int jobShardingCount, final Collection<JobProgress> jobProgresses) {
+        return jobShardingCount == jobProgresses.size()
+                && jobProgresses.stream().allMatch(each -> null != each && JobStatus.FINISHED == each.getStatus());
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 4f1db0bcea3..aae0bbfff89 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -73,13 +73,15 @@ public final class RuleAlteredJobSchedulerCenter {
      */
     public static void stop(final String jobId) {
         log.info("remove and stop {}", jobId);
-        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.remove(jobId);
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.get(jobId);
         if (null == schedulerMap) {
+            log.info("schedulerMap is null, ignore");
             return;
         }
         for (Entry<Integer, RuleAlteredJobScheduler> entry : schedulerMap.entrySet()) {
             entry.getValue().stop();
         }
+        JOB_SCHEDULER_MAP.remove(jobId);
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 95f9c300439..43e777ca587 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -181,6 +181,7 @@ public final class RuleAlteredJobWorker {
             log.info("Switch rule configuration immediately.");
             ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getSchemaName(), event.getActiveVersion(), event.getNewVersion());
             ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+            ShardingSphereEventBus.getInstance().post(new ScalingReleaseSchemaNameLockEvent(event.getSchemaName()));
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 8b9ead47f6d..12d2aa93e76 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -37,6 +37,7 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     @Override
     public WalPosition init(final DataSource dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
+            // TODO createSlotIfNotExists
             return getWalPosition(connection);
         }
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index 12740630642..9f20968ad19 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
@@ -37,8 +38,10 @@ import java.util.Properties;
 /**
  * Logical replication for openGauss.
  */
+@Slf4j
 public final class OpenGaussLogicalReplication {
     
+    // TODO it should be private
     public static final String SLOT_NAME_PREFIX = "sharding_scaling";
     
     public static final String DECODE_PLUGIN = "mppdb_decoding";
@@ -90,7 +93,8 @@ public final class OpenGaussLogicalReplication {
      * @throws SQLException SQL exception
      */
     public static void createIfNotExists(final Connection connection) throws SQLException {
-        if (!isSlotNameExist(connection)) {
+        String slotName = getUniqueSlotName(connection);
+        if (!isSlotNameExist(connection, slotName)) {
             createSlotBySQL(connection);
         }
     }
@@ -102,16 +106,21 @@ public final class OpenGaussLogicalReplication {
      * @throws SQLException drop SQL with error
      */
     public static void dropSlot(final Connection connection) throws SQLException {
-        String sql = String.format("select * from pg_drop_replication_slot('%s')", getUniqueSlotName(connection));
+        String slotName = getUniqueSlotName(connection);
+        if (!isSlotNameExist(connection, slotName)) {
+            log.info("dropSlot, slot not exist, ignore, slotName={}", slotName);
+            return;
+        }
+        String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
         try (CallableStatement callableStatement = connection.prepareCall(sql)) {
             callableStatement.execute();
         }
     }
     
-    private static boolean isSlotNameExist(final Connection connection) throws SQLException {
+    private static boolean isSlotNameExist(final Connection connection, final String slotName) throws SQLException {
         String sql = "select * from pg_replication_slots where slot_name=?";
         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, getUniqueSlotName(connection));
+            preparedStatement.setString(1, slotName);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 return resultSet.next();
             }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index e2de501621b..14b2bfa838c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -36,6 +36,7 @@ import java.sql.SQLException;
 @Slf4j
 public final class PostgreSQLPositionInitializer implements PositionInitializer {
     
+    // TODO it should be private; _PREFIX;
     public static final String SLOT_NAME = "sharding_scaling";
     
     public static final String DECODE_PLUGIN = "test_decoding";
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index ef496dfd507..46d7681ed1e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -21,7 +21,6 @@ import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseSchemaNameLockEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionPreparedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
@@ -81,7 +80,5 @@ public final class ScalingRegistrySubscriber {
         } else {
             log.error("targetActiveVersion does not match current activeVersion, targetActiveVersion={}, activeVersion={}", targetActiveVersion, activeVersion.orElse(null));
         }
-        ScalingReleaseSchemaNameLockEvent releaseLockEvent = new ScalingReleaseSchemaNameLockEvent(event.getTargetSchemaName());
-        ShardingSphereEventBus.getInstance().post(releaseLockEvent);
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/handler/CuratorZookeeperExceptionHandler.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-r [...]
index 4627866fb06..519f1f98341 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/handler/CuratorZookeeperExceptionHandler.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/handler/CuratorZookeeperExceptionHandler.java
@@ -41,11 +41,13 @@ public final class CuratorZookeeperExceptionHandler {
      */
     public static void handleException(final Exception cause) {
         if (null == cause) {
+            log.info("cause is null");
             return;
         }
         if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
             log.debug("Ignored exception for: {}", cause.getMessage());
         } else if (cause instanceof InterruptedException) {
+            log.info("InterruptedException caught");
             Thread.currentThread().interrupt();
         } else {
             throw new ClusterPersistRepositoryException(cause);