You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/07/28 06:49:55 UTC

[shardingsphere] branch master updated: Close HA job when drop database (#19619)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ee0d0baa31 Close HA job when drop database (#19619)
7ee0d0baa31 is described below

commit 7ee0d0baa3143ae35807733e9359567e2cacbd93
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Thu Jul 28 14:49:47 2022 +0800

    Close HA job when drop database (#19619)
    
    * Close HA job when drop database
    
    * Remove close data source index
    
    * Fix ci
    
    * fix npe
---
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java               | 11 +++++++++++
 .../shardingsphere/infra/metadata/ShardingSphereMetaData.java | 10 ++++++----
 .../rule/identifier/type/DynamicDataSourceContainedRule.java  |  6 ++++++
 .../schedule/core/strategy/ScheduleStrategy.java              |  7 +++++++
 .../schedule/core/strategy/type/ClusterScheduleStrategy.java  |  6 ++++++
 .../core/strategy/type/StandaloneScheduleStrategy.java        |  5 +++++
 .../cluster/coordinator/ClusterContextManagerCoordinator.java |  6 ++++++
 7 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 0ce183d61ca..c1490e48522 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -165,6 +165,17 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
         initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
     }
     
+    @Override
+    public void closeHeartBeatJob() {
+        Optional<ScheduleStrategy> scheduleStrategy = ScheduleContextFactory.getInstance().get(instanceContext.getInstance().getCurrentInstanceId());
+        if (scheduleStrategy.isPresent()) {
+            for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
+                DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+                scheduleStrategy.get().closeSchedule(rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName());
+            }
+        }
+    }
+    
     private void initHeartBeatJobs(final String instanceId) {
         Optional<ScheduleStrategy> scheduleStrategy = ScheduleContextFactory.getInstance().get(instanceId);
         if (scheduleStrategy.isPresent()) {
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
index 625a1e3aaf9..525bef3e484 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
@@ -22,12 +22,14 @@ import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
 
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -111,15 +113,15 @@ public final class ShardingSphereMetaData {
      * @param databaseName database name
      */
     public void dropDatabase(final String databaseName) {
-        closeResources(databases.remove(databaseName.toLowerCase()));
+        ShardingSphereDatabase toBeRemovedDatabase = databases.remove(databaseName.toLowerCase());
+        closeResources(toBeRemovedDatabase);
     }
     
     private void closeResources(final ShardingSphereDatabase database) {
-        if (null != database.getResource()) {
-            database.getResource().getDataSources().values().forEach(each -> database.getResource().close(each));
-        }
         String databaseName = database.getName();
         globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
         database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
+        database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(DynamicDataSourceContainedRule::closeHeartBeatJob);
+        Optional.ofNullable(database.getResource()).ifPresent(optional -> optional.getDataSources().values().forEach(each -> database.getResource().close(each)));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java
index 2e8266c47dd..497c0411680 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java
@@ -53,8 +53,14 @@ public interface DynamicDataSourceContainedRule extends ShardingSphereRule {
     
     /**
      * Restart heart beat job.
+     *
      * @param event data source status changed event
      * @param instanceContext instance context
      */
     void restartHeartBeatJob(DataSourceStatusChangedEvent event, InstanceContext instanceContext);
+    
+    /**
+     * Close heart beat.
+     */
+    void closeHeartBeatJob();
 }
diff --git a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java
index ebfebc6a299..495a9a401ff 100644
--- a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java
+++ b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java
@@ -30,4 +30,11 @@ public interface ScheduleStrategy {
      * @param job cron job
      */
     void startSchedule(CronJob job);
+    
+    /**
+     * close schedule.
+     *
+     * @param jobName job name
+     */
+    void closeSchedule(String jobName);
 }
diff --git a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java
index 0da283f6cdf..16b06d33f18 100644
--- a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java
+++ b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java
@@ -36,6 +36,7 @@ import org.apache.shardingsphere.schedule.core.strategy.ScheduleStrategy;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 /**
@@ -77,6 +78,11 @@ public final class ClusterScheduleStrategy implements ScheduleStrategy {
         SCHEDULE_JOB_BOOTSTRAP_MAP.get(job.getJobName()).schedule();
     }
     
+    @Override
+    public void closeSchedule(final String jobName) {
+        Optional.ofNullable(SCHEDULE_JOB_BOOTSTRAP_MAP.remove(jobName)).ifPresent(ScheduleJobBootstrap::shutdown);
+    }
+    
     @SneakyThrows(ConcurrentException.class)
     private CoordinatorRegistryCenter getRegistryCenter() {
         return registryCenterLazyInitializer.get();
diff --git a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java
index a2e6dd75a0b..d54f8bd3d38 100644
--- a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java
+++ b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java
@@ -29,4 +29,9 @@ public final class StandaloneScheduleStrategy implements ScheduleStrategy {
     public void startSchedule(final CronJob job) {
         // TODO
     }
+    
+    @Override
+    public void closeSchedule(final String jobName) {
+        // TODO
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 39689f22838..855acc8c144 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -170,6 +170,9 @@ public final class ClusterContextManagerCoordinator {
     @Subscribe
     public synchronized void renew(final StorageNodeChangedEvent event) {
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+            return;
+        }
         Optional<ShardingSphereRule> dynamicDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
                 .getRules().stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();
         if (dynamicDataSourceRule.isPresent()) {
@@ -189,6 +192,9 @@ public final class ClusterContextManagerCoordinator {
      */
     @Subscribe
     public synchronized void renew(final PrimaryStateChangedEvent event) {
+        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+            return;
+        }
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
         contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
                 .stream()