You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/07/28 06:49:55 UTC
[shardingsphere] branch master updated: Close HA job when drop database (#19619)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7ee0d0baa31 Close HA job when drop database (#19619)
7ee0d0baa31 is described below
commit 7ee0d0baa3143ae35807733e9359567e2cacbd93
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Thu Jul 28 14:49:47 2022 +0800
Close HA job when drop database (#19619)
* Close HA job when drop database
* Remove close data source index
* Fix ci
* fix npe
---
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 11 +++++++++++
.../shardingsphere/infra/metadata/ShardingSphereMetaData.java | 10 ++++++----
.../rule/identifier/type/DynamicDataSourceContainedRule.java | 6 ++++++
.../schedule/core/strategy/ScheduleStrategy.java | 7 +++++++
.../schedule/core/strategy/type/ClusterScheduleStrategy.java | 6 ++++++
.../core/strategy/type/StandaloneScheduleStrategy.java | 5 +++++
.../cluster/coordinator/ClusterContextManagerCoordinator.java | 6 ++++++
7 files changed, 47 insertions(+), 4 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 0ce183d61ca..c1490e48522 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -165,6 +165,17 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
}
+ @Override
+ public void closeHeartBeatJob() {
+ Optional<ScheduleStrategy> scheduleStrategy = ScheduleContextFactory.getInstance().get(instanceContext.getInstance().getCurrentInstanceId());
+ if (scheduleStrategy.isPresent()) {
+ for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
+ DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+ scheduleStrategy.get().closeSchedule(rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName());
+ }
+ }
+ }
+
private void initHeartBeatJobs(final String instanceId) {
Optional<ScheduleStrategy> scheduleStrategy = ScheduleContextFactory.getInstance().get(instanceId);
if (scheduleStrategy.isPresent()) {
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
index 625a1e3aaf9..525bef3e484 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
@@ -22,12 +22,14 @@ import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
import java.sql.SQLException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -111,15 +113,15 @@ public final class ShardingSphereMetaData {
* @param databaseName database name
*/
public void dropDatabase(final String databaseName) {
- closeResources(databases.remove(databaseName.toLowerCase()));
+ ShardingSphereDatabase toBeRemovedDatabase = databases.remove(databaseName.toLowerCase());
+ closeResources(toBeRemovedDatabase);
}
private void closeResources(final ShardingSphereDatabase database) {
- if (null != database.getResource()) {
- database.getResource().getDataSources().values().forEach(each -> database.getResource().close(each));
- }
String databaseName = database.getName();
globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
+ database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(DynamicDataSourceContainedRule::closeHeartBeatJob);
+ Optional.ofNullable(database.getResource()).ifPresent(optional -> optional.getDataSources().values().forEach(each -> database.getResource().close(each)));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java
index 2e8266c47dd..497c0411680 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicDataSourceContainedRule.java
@@ -53,8 +53,14 @@ public interface DynamicDataSourceContainedRule extends ShardingSphereRule {
/**
* Restart heart beat job.
+ *
* @param event data source status changed event
* @param instanceContext instance context
*/
void restartHeartBeatJob(DataSourceStatusChangedEvent event, InstanceContext instanceContext);
+
+ /**
+ * Close heart beat.
+ */
+ void closeHeartBeatJob();
}
diff --git a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java
index ebfebc6a299..495a9a401ff 100644
--- a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java
+++ b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/ScheduleStrategy.java
@@ -30,4 +30,11 @@ public interface ScheduleStrategy {
* @param job cron job
*/
void startSchedule(CronJob job);
+
+ /**
+ * close schedule.
+ *
+ * @param jobName job name
+ */
+ void closeSchedule(String jobName);
}
diff --git a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java
index 0da283f6cdf..16b06d33f18 100644
--- a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java
+++ b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/ClusterScheduleStrategy.java
@@ -36,6 +36,7 @@ import org.apache.shardingsphere.schedule.core.strategy.ScheduleStrategy;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Consumer;
/**
@@ -77,6 +78,11 @@ public final class ClusterScheduleStrategy implements ScheduleStrategy {
SCHEDULE_JOB_BOOTSTRAP_MAP.get(job.getJobName()).schedule();
}
+ @Override
+ public void closeSchedule(final String jobName) {
+ Optional.ofNullable(SCHEDULE_JOB_BOOTSTRAP_MAP.remove(jobName)).ifPresent(ScheduleJobBootstrap::shutdown);
+ }
+
@SneakyThrows(ConcurrentException.class)
private CoordinatorRegistryCenter getRegistryCenter() {
return registryCenterLazyInitializer.get();
diff --git a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java
index a2e6dd75a0b..d54f8bd3d38 100644
--- a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java
+++ b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/strategy/type/StandaloneScheduleStrategy.java
@@ -29,4 +29,9 @@ public final class StandaloneScheduleStrategy implements ScheduleStrategy {
public void startSchedule(final CronJob job) {
// TODO
}
+
+ @Override
+ public void closeSchedule(final String jobName) {
+ // TODO
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 39689f22838..855acc8c144 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -170,6 +170,9 @@ public final class ClusterContextManagerCoordinator {
@Subscribe
public synchronized void renew(final StorageNodeChangedEvent event) {
QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+ if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+ return;
+ }
Optional<ShardingSphereRule> dynamicDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
.getRules().stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();
if (dynamicDataSourceRule.isPresent()) {
@@ -189,6 +192,9 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final PrimaryStateChangedEvent event) {
+ if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+ return;
+ }
QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
.stream()