You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2023/02/21 09:46:12 UTC

[shardingsphere] branch master updated: Refactor drop database stop heartbeat job and remove storage node data sources. (#24275)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e5c7187300 Refactor drop database stop heartbeat job and remove storage node data sources. (#24275)
6e5c7187300 is described below

commit 6e5c71873007979eb0ea6eac86c56c1b0d267622
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Tue Feb 21 17:46:04 2023 +0800

    Refactor drop database stop heartbeat job and remove storage node data sources. (#24275)
    
    * Refactor drop database stop heartbeat job and remove storage node data sources.
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * FIX checkstyle
---
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java    | 31 ++++++++++++++--------
 .../DropDatabaseDiscoveryRuleStatementUpdater.java |  6 -----
 .../rule/ReadwriteSplittingRule.java               | 26 +++++++++++++++++-
 .../builder/ReadwriteSplittingRuleBuilder.java     |  2 +-
 .../route/ReadwriteSplittingSQLRouterTest.java     |  4 +--
 .../rule/ReadwriteSplittingRuleTest.java           |  2 +-
 .../infra/metadata/ShardingSphereMetaData.java     |  2 ++
 .../type/StaticDataSourceContainedRule.java        | 12 +++++++++
 .../event/StorageNodeDataSourceDeletedEvent.java   | 20 +++++++-------
 .../registry/status/storage/node/StorageNode.java  |  4 +--
 .../subscriber/StorageNodeStatusSubscriber.java    | 15 +++++++++--
 .../StorageNodeStatusSubscriberTest.java           | 17 +++++++++---
 .../ImportDatabaseConfigurationUpdater.java        |  2 +-
 .../rdl/rule/RuleDefinitionBackendHandler.java     | 19 +++++++++++--
 14 files changed, 119 insertions(+), 43 deletions(-)

diff --git a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 252481d87d8..737392d0063 100644
--- a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -44,6 +44,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
 import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import org.apache.shardingsphere.schedule.core.ScheduleContextFactory;
 
 import javax.sql.DataSource;
@@ -119,6 +120,17 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
         }
     }
     
+    private void initHeartBeatJobs() {
+        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
+            DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+            String jobName = rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName();
+            CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
+                    rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext).execute(null),
+                    rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+            scheduleContext.startSchedule(job);
+        }
+    }
+    
     /**
      * Get single data source rule.
      *
@@ -162,24 +174,21 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
         DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(groupName);
         ShardingSpherePreconditions.checkNotNull(dataSourceRule, () -> new DBDiscoveryDataSourceRuleNotFoundException(databaseName));
         scheduleContext.closeSchedule(dataSourceRule.getProvider().getType() + "-" + databaseName + "-" + dataSourceRule.getGroupName());
+        deleteStorageNodeDataSources(dataSourceRule);
     }
     
-    @Override
-    public void closeAllHeartBeatJob() {
-        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
-            DatabaseDiscoveryDataSourceRule rule = entry.getValue();
-            scheduleContext.closeSchedule(rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName());
+    private void deleteStorageNodeDataSources(final DatabaseDiscoveryDataSourceRule dataSourceRule) {
+        for (String each : dataSourceRule.getDataSourceNames()) {
+            instanceContext.getEventBusContext().post(new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, dataSourceRule.getGroupName(), each)));
         }
     }
     
-    private void initHeartBeatJobs() {
+    @Override
+    public void closeAllHeartBeatJob() {
         for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
             DatabaseDiscoveryDataSourceRule rule = entry.getValue();
-            String jobName = rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName();
-            CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
-                    rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext).execute(null),
-                    rule.getHeartbeatProps().getProperty("keep-alive-cron"));
-            scheduleContext.startSchedule(job);
+            scheduleContext.closeSchedule(rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName());
+            deleteStorageNodeDataSources(rule);
         }
     }
     
diff --git a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
index 8708c833893..feb538fd2c4 100644
--- a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
+++ b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.dbdiscovery.distsql.handler.update;
 import org.apache.shardingsphere.dbdiscovery.api.config.DatabaseDiscoveryRuleConfiguration;
 import org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration;
 import org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
-import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableConstants;
 import org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableItemConstants;
 import org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -51,7 +50,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater implements RuleDefi
         String databaseName = database.getName();
         checkCurrentRuleConfiguration(databaseName, sqlStatement, currentRuleConfig);
         checkIsInUse(databaseName, sqlStatement, database);
-        closeHeartbeatJob(database, sqlStatement);
     }
     
     private void checkCurrentRuleConfiguration(final String databaseName, final DropDatabaseDiscoveryRuleStatement sqlStatement, final DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
@@ -81,10 +79,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater implements RuleDefi
         ShardingSpherePreconditions.checkState(invalid.isEmpty(), () -> new RuleInUsedException(RULE_TYPE, databaseName, invalid));
     }
     
-    private void closeHeartbeatJob(final ShardingSphereDatabase database, final DropDatabaseDiscoveryRuleStatement sqlStatement) {
-        sqlStatement.getNames().forEach(each -> database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(optional -> optional.closeSingleHeartBeatJob(each)));
-    }
-    
     @Override
     public boolean updateCurrentRuleConfiguration(final DropDatabaseDiscoveryRuleStatement sqlStatement, final DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
         sqlStatement.getNames().forEach(each -> dropRule(currentRuleConfig, each));
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 101682eb36b..ebd843d992c 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -36,6 +36,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.infra.util.expr.InlineExpressionParser;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
 import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
 import org.apache.shardingsphere.readwritesplitting.api.strategy.DynamicReadwriteSplittingStrategyConfiguration;
@@ -59,6 +60,8 @@ import java.util.stream.Collectors;
  */
 public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceContainedRule, StaticDataSourceContainedRule, ExportableRule, StorageConnectorReusableRule {
     
+    private final String databaseName;
+    
     @Getter
     private final RuleConfiguration configuration;
     
@@ -66,7 +69,8 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon
     
     private final Map<String, ReadwriteSplittingDataSourceRule> dataSourceRules;
     
-    public ReadwriteSplittingRule(final ReadwriteSplittingRuleConfiguration ruleConfig, final Collection<ShardingSphereRule> builtRules) {
+    public ReadwriteSplittingRule(final String databaseName, final ReadwriteSplittingRuleConfiguration ruleConfig, final Collection<ShardingSphereRule> builtRules) {
+        this.databaseName = databaseName;
         configuration = ruleConfig;
         for (ReadwriteSplittingDataSourceRuleConfiguration dataSourceRuleConfiguration : ruleConfig.getDataSources()) {
             if (ruleConfig.getLoadBalancers().containsKey(dataSourceRuleConfiguration.getLoadBalancerName())) {
@@ -176,6 +180,26 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon
         dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(), DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus());
     }
     
+    @Override
+    public void cleanStorageNodeDataSource(final String groupName) {
+        Preconditions.checkNotNull(dataSourceRules.get(groupName), String.format("`%s` group name not exist in database `%s`", groupName, databaseName));
+        deleteStorageNodeDataSources(dataSourceRules.get(groupName));
+    }
+    
+    private void deleteStorageNodeDataSources(final ReadwriteSplittingDataSourceRule rule) {
+        if (rule.getReadwriteSplittingStrategy() instanceof DynamicReadwriteSplittingStrategy) {
+            return;
+        }
+        rule.getReadwriteSplittingStrategy().getReadDataSources().forEach(each -> new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, rule.getName(), each)));
+    }
+    
+    @Override
+    public void cleanStorageNodeDataSources() {
+        for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
+            deleteStorageNodeDataSources(entry.getValue());
+        }
+    }
+    
     @Override
     public Map<String, Object> getExportData() {
         Map<String, Object> result = new HashMap<>(2, 1);
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
index f0d3afbcbf6..1fe9cf64e35 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
@@ -36,7 +36,7 @@ public final class ReadwriteSplittingRuleBuilder implements DatabaseRuleBuilder<
     @Override
     public ReadwriteSplittingRule build(final ReadwriteSplittingRuleConfiguration config, final String databaseName,
                                         final Map<String, DataSource> dataSources, final Collection<ShardingSphereRule> builtRules, final InstanceContext instanceContext) {
-        return new ReadwriteSplittingRule(config, builtRules);
+        return new ReadwriteSplittingRule(databaseName, config, builtRules);
     }
     
     @Override
diff --git a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
index 0ad3206292d..26a56b8dfbd 100644
--- a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
+++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
@@ -86,14 +86,14 @@ public final class ReadwriteSplittingSQLRouterTest {
     
     @Before
     public void setUp() {
-        staticRule = new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
+        staticRule = new ReadwriteSplittingRule("logic_db", new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
                 new StaticReadwriteSplittingStrategyConfiguration(WRITE_DATASOURCE, Collections.singletonList(READ_DATASOURCE)), null, "")),
                 Collections.emptyMap()), Collections.emptyList());
         sqlRouter = (ReadwriteSplittingSQLRouter) OrderedSPILoader.getServices(SQLRouter.class, Collections.singleton(staticRule)).get(staticRule);
         DynamicDataSourceContainedRule dynamicDataSourceRule = mock(DynamicDataSourceContainedRule.class, RETURNS_DEEP_STUBS);
         when(dynamicDataSourceRule.getPrimaryDataSourceName("readwrite_ds")).thenReturn(WRITE_DATASOURCE);
         when(dynamicDataSourceRule.getReplicaDataSourceNames("readwrite_ds")).thenReturn(Collections.emptyList());
-        dynamicRule = new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
+        dynamicRule = new ReadwriteSplittingRule("logic_db", new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
                 new DynamicReadwriteSplittingStrategyConfiguration("readwrite_ds", "true"), "")), Collections.emptyMap()),
                 Collections.singleton(dynamicDataSourceRule));
         dynamicSqlRouter = (ReadwriteSplittingSQLRouter) OrderedSPILoader.getServices(SQLRouter.class, Collections.singleton(dynamicRule)).get(dynamicRule);
diff --git a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
index a4c8ae40a45..bb117c99443 100644
--- a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
+++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
@@ -56,7 +56,7 @@ public final class ReadwriteSplittingRuleTest {
     private ReadwriteSplittingRule createReadwriteSplittingRule() {
         ReadwriteSplittingDataSourceRuleConfiguration config =
                 new ReadwriteSplittingDataSourceRuleConfiguration("readwrite", new StaticReadwriteSplittingStrategyConfiguration("write_ds", Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
-        return new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration(
+        return new ReadwriteSplittingRule("logic_db", new ReadwriteSplittingRuleConfiguration(
                 Collections.singleton(config), Collections.singletonMap("random", new AlgorithmConfiguration("RANDOM", new Properties()))), Collections.emptyList());
     }
     
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
index 11c8f301bc6..b0ae4078eef 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -125,6 +126,7 @@ public final class ShardingSphereMetaData {
         globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
         database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
         database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(DynamicDataSourceContainedRule::closeAllHeartBeatJob);
+        database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
         Optional.ofNullable(database.getResourceMetaData()).ifPresent(optional -> optional.getDataSources().values().forEach(each -> database.getResourceMetaData().close(each)));
     }
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
index b12c8e6c044..728674bf615 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
@@ -31,4 +31,16 @@ public interface StaticDataSourceContainedRule extends ShardingSphereRule {
      * @param event data source status changed event
      */
     void updateStatus(DataSourceStatusChangedEvent event);
+    
+    /**
+     * Clean single storage node data source.
+     *
+     * @param groupName group name
+     */
+    void cleanStorageNodeDataSource(String groupName);
+    
+    /**
+     * Clean storage nodes data sources.
+     */
+    void cleanStorageNodeDataSources();
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
similarity index 62%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
copy to mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
index b12c8e6c044..010ed71d979 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.rule.identifier.type;
+package org.apache.shardingsphere.mode.metadata.storage.event;
 
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 
 /**
- * Static data source contained rule.
+ * Storage node data source deleted event.
  */
-public interface StaticDataSourceContainedRule extends ShardingSphereRule {
+@RequiredArgsConstructor
+@Getter
+public final class StorageNodeDataSourceDeletedEvent {
     
-    /**
-     * Update data source status.
-     *
-     * @param event data source status changed event
-     */
-    void updateStatus(DataSourceStatusChangedEvent event);
+    private final QualifiedDatabase qualifiedDatabase;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
index ef9c5003a59..4a451545dd6 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
@@ -55,12 +55,12 @@ public final class StorageNode {
     }
     
     /**
-     * Get storage node status path.
+     * Get storage node data source path.
      *
      * @param database cluster database
      * @return status path of storage node
      */
-    public static String getStatusPath(final QualifiedDatabase database) {
+    public static String getStorageNodeDataSourcePath(final QualifiedDatabase database) {
         return String.join("/", getRootPath(), database.toString());
     }
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
index 6548aa98c48..7aa3904c788 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
 import org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
 import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
@@ -50,7 +51,7 @@ public final class StorageNodeStatusSubscriber {
      */
     @Subscribe
     public void update(final DataSourceDisabledEvent event) {
-        repository.persist(StorageNode.getStatusPath(new QualifiedDatabase(event.getDatabaseName(), event.getGroupName(), event.getDataSourceName())),
+        repository.persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(event.getDatabaseName(), event.getGroupName(), event.getDataSourceName())),
                 YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(event.getStorageNodeDataSource())));
     }
     
@@ -61,7 +62,17 @@ public final class StorageNodeStatusSubscriber {
      */
     @Subscribe
     public void update(final PrimaryDataSourceChangedEvent event) {
-        repository.persist(StorageNode.getStatusPath(event.getQualifiedDatabase()),
+        repository.persist(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()),
                 YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
     }
+    
+    /**
+     * Delete storage node data source.
+     *
+     * @param event storage node data source deleted event
+     */
+    @Subscribe
+    public void delete(final StorageNodeDataSourceDeletedEvent event) {
+        repository.delete(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()));
+    }
 }
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
index 9571e4daf0c..f721e3db0aa 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
 import org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
 import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,7 +52,7 @@ public final class StorageNodeStatusSubscriberTest {
         StorageNodeDataSource storageNodeDataSource = new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED);
         DataSourceDisabledEvent dataSourceDisabledEvent = new DataSourceDisabledEvent(databaseName, groupName, dataSourceName, storageNodeDataSource);
         new StorageNodeStatusSubscriber(repository, eventBusContext).update(dataSourceDisabledEvent);
-        verify(repository).persist(StorageNode.getStatusPath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
+        verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
                 YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED))));
     }
     
@@ -63,7 +64,7 @@ public final class StorageNodeStatusSubscriberTest {
         StorageNodeDataSource storageNodeDataSource = new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED);
         DataSourceDisabledEvent dataSourceDisabledEvent = new DataSourceDisabledEvent(databaseName, groupName, dataSourceName, storageNodeDataSource);
         new StorageNodeStatusSubscriber(repository, eventBusContext).update(dataSourceDisabledEvent);
-        verify(repository).persist(StorageNode.getStatusPath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
+        verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
                 YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(storageNodeDataSource)));
     }
     
@@ -74,7 +75,17 @@ public final class StorageNodeStatusSubscriberTest {
         String dataSourceName = "replica_ds_0";
         PrimaryDataSourceChangedEvent event = new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, dataSourceName));
         new StorageNodeStatusSubscriber(repository, eventBusContext).update(event);
-        verify(repository).persist(StorageNode.getStatusPath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
+        verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
                 YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
     }
+    
+    @Test
+    public void assertDeleteStorageNodeDataSourceDataSourceState() {
+        String databaseName = "replica_query_db";
+        String groupName = "readwrite_ds";
+        String dataSourceName = "replica_ds_0";
+        StorageNodeDataSourceDeletedEvent event = new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, groupName, dataSourceName));
+        new StorageNodeStatusSubscriber(repository, eventBusContext).delete(event);
+        verify(repository).delete(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)));
+    }
 }
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
index 9977705be9e..0eedb5077e9 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
@@ -169,7 +169,7 @@ public final class ImportDatabaseConfigurationUpdater implements RALUpdater<Impo
                 ReadwriteSplittingRuleConfiguration readwriteSplittingRuleConfig = new YamlReadwriteSplittingRuleConfigurationSwapper().swapToObject((YamlReadwriteSplittingRuleConfiguration) each);
                 readwriteSplittingRuleConfigurationImportChecker.check(database, readwriteSplittingRuleConfig);
                 ruleConfigs.add(readwriteSplittingRuleConfig);
-                rules.add(new ReadwriteSplittingRule(readwriteSplittingRuleConfig, rules));
+                rules.add(new ReadwriteSplittingRule(databaseName, readwriteSplittingRuleConfig, rules));
             } else if (each instanceof YamlDatabaseDiscoveryRuleConfiguration) {
                 DatabaseDiscoveryRuleConfiguration databaseDiscoveryRuleConfig = new YamlDatabaseDiscoveryRuleConfigurationSwapper().swapToObject((YamlDatabaseDiscoveryRuleConfiguration) each);
                 databaseDiscoveryRuleConfigurationImportChecker.check(database, databaseDiscoveryRuleConfig);
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index df4b5b33f3e..b98fab4185b 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule;
 
+import org.apache.shardingsphere.dbdiscovery.distsql.handler.update.DropDatabaseDiscoveryRuleStatementUpdater;
+import org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
 import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionAlterUpdater;
 import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionCreateUpdater;
 import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionDropUpdater;
@@ -24,6 +26,8 @@ import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionUpdater;
 import org.apache.shardingsphere.distsql.parser.statement.rdl.RuleDefinitionStatement;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -31,6 +35,8 @@ import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.RDLBackendHan
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.readwritesplitting.distsql.handler.update.DropReadwriteSplittingRuleStatementUpdater;
+import org.apache.shardingsphere.readwritesplitting.distsql.parser.statement.DropReadwriteSplittingRuleStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.Collection;
@@ -85,7 +91,7 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
             result.remove(currentRuleConfig);
             result.add(processAlter(sqlStatement, (RuleDefinitionAlterUpdater) updater, currentRuleConfig));
         } else if (updater instanceof RuleDefinitionDropUpdater) {
-            processDrop(result, sqlStatement, (RuleDefinitionDropUpdater) updater, currentRuleConfig);
+            processDrop(database, result, sqlStatement, (RuleDefinitionDropUpdater) updater, currentRuleConfig);
         } else {
             throw new UnsupportedSQLOperationException(String.format("Cannot support RDL updater type `%s`", updater.getClass().getName()));
         }
@@ -110,13 +116,22 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
     }
     
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private void processDrop(final Collection<RuleConfiguration> configs, final T sqlStatement, final RuleDefinitionDropUpdater updater, final RuleConfiguration currentRuleConfig) {
+    private void processDrop(final ShardingSphereDatabase database, final Collection<RuleConfiguration> configs, final T sqlStatement,
+                             final RuleDefinitionDropUpdater updater, final RuleConfiguration currentRuleConfig) {
         if (!updater.hasAnyOneToBeDropped(sqlStatement, currentRuleConfig)) {
             return;
         }
         if (updater.updateCurrentRuleConfiguration(sqlStatement, currentRuleConfig)) {
             configs.remove(currentRuleConfig);
         }
+        if (updater instanceof DropDatabaseDiscoveryRuleStatementUpdater) {
+            database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class)
+                    .ifPresent(optional -> ((DropDatabaseDiscoveryRuleStatement) sqlStatement).getNames().forEach(optional::closeSingleHeartBeatJob));
+        }
+        if (updater instanceof DropReadwriteSplittingRuleStatementUpdater) {
+            database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)
+                    .ifPresent(optional -> ((DropReadwriteSplittingRuleStatement) sqlStatement).getNames().forEach(optional::cleanStorageNodeDataSource));
+        }
     }
     
     private boolean getRefreshStatus(final SQLStatement sqlStatement, final RuleConfiguration currentRuleConfig, final RuleDefinitionUpdater<?, ?> updater) {