You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2023/02/21 09:46:12 UTC
[shardingsphere] branch master updated: Refactor drop database stop heartbeat job and remove storage node data sources. (#24275)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6e5c7187300 Refactor drop database stop heartbeat job and remove storage node data sources. (#24275)
6e5c7187300 is described below
commit 6e5c71873007979eb0ea6eac86c56c1b0d267622
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Tue Feb 21 17:46:04 2023 +0800
Refactor drop database stop heartbeat job and remove storage node data sources. (#24275)
* Refactor drop database stop heartbeat job and remove storage node data sources.
* Fix checkstyle
* Fix checkstyle
* Fix checkstyle
* FIX checkstyle
---
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 31 ++++++++++++++--------
.../DropDatabaseDiscoveryRuleStatementUpdater.java | 6 -----
.../rule/ReadwriteSplittingRule.java | 26 +++++++++++++++++-
.../builder/ReadwriteSplittingRuleBuilder.java | 2 +-
.../route/ReadwriteSplittingSQLRouterTest.java | 4 +--
.../rule/ReadwriteSplittingRuleTest.java | 2 +-
.../infra/metadata/ShardingSphereMetaData.java | 2 ++
.../type/StaticDataSourceContainedRule.java | 12 +++++++++
.../event/StorageNodeDataSourceDeletedEvent.java | 20 +++++++-------
.../registry/status/storage/node/StorageNode.java | 4 +--
.../subscriber/StorageNodeStatusSubscriber.java | 15 +++++++++--
.../StorageNodeStatusSubscriberTest.java | 17 +++++++++---
.../ImportDatabaseConfigurationUpdater.java | 2 +-
.../rdl/rule/RuleDefinitionBackendHandler.java | 19 +++++++++++--
14 files changed, 119 insertions(+), 43 deletions(-)
diff --git a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 252481d87d8..737392d0063 100644
--- a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -44,6 +44,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.schedule.core.ScheduleContextFactory;
import javax.sql.DataSource;
@@ -119,6 +120,17 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
}
}
+ private void initHeartBeatJobs() {
+ for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
+ DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+ String jobName = rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName();
+ CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
+ rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext).execute(null),
+ rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+ scheduleContext.startSchedule(job);
+ }
+ }
+
/**
* Get single data source rule.
*
@@ -162,24 +174,21 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(groupName);
ShardingSpherePreconditions.checkNotNull(dataSourceRule, () -> new DBDiscoveryDataSourceRuleNotFoundException(databaseName));
scheduleContext.closeSchedule(dataSourceRule.getProvider().getType() + "-" + databaseName + "-" + dataSourceRule.getGroupName());
+ deleteStorageNodeDataSources(dataSourceRule);
}
- @Override
- public void closeAllHeartBeatJob() {
- for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
- DatabaseDiscoveryDataSourceRule rule = entry.getValue();
- scheduleContext.closeSchedule(rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName());
+ private void deleteStorageNodeDataSources(final DatabaseDiscoveryDataSourceRule dataSourceRule) {
+ for (String each : dataSourceRule.getDataSourceNames()) {
+ instanceContext.getEventBusContext().post(new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, dataSourceRule.getGroupName(), each)));
}
}
- private void initHeartBeatJobs() {
+ @Override
+ public void closeAllHeartBeatJob() {
for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
DatabaseDiscoveryDataSourceRule rule = entry.getValue();
- String jobName = rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName();
- CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
- rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext).execute(null),
- rule.getHeartbeatProps().getProperty("keep-alive-cron"));
- scheduleContext.startSchedule(job);
+ scheduleContext.closeSchedule(rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName());
+ deleteStorageNodeDataSources(rule);
}
}
diff --git a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
index 8708c833893..feb538fd2c4 100644
--- a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
+++ b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.dbdiscovery.distsql.handler.update;
import org.apache.shardingsphere.dbdiscovery.api.config.DatabaseDiscoveryRuleConfiguration;
import org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration;
import org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
-import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableConstants;
import org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableItemConstants;
import org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -51,7 +50,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater implements RuleDefi
String databaseName = database.getName();
checkCurrentRuleConfiguration(databaseName, sqlStatement, currentRuleConfig);
checkIsInUse(databaseName, sqlStatement, database);
- closeHeartbeatJob(database, sqlStatement);
}
private void checkCurrentRuleConfiguration(final String databaseName, final DropDatabaseDiscoveryRuleStatement sqlStatement, final DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
@@ -81,10 +79,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater implements RuleDefi
ShardingSpherePreconditions.checkState(invalid.isEmpty(), () -> new RuleInUsedException(RULE_TYPE, databaseName, invalid));
}
- private void closeHeartbeatJob(final ShardingSphereDatabase database, final DropDatabaseDiscoveryRuleStatement sqlStatement) {
- sqlStatement.getNames().forEach(each -> database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(optional -> optional.closeSingleHeartBeatJob(each)));
- }
-
@Override
public boolean updateCurrentRuleConfiguration(final DropDatabaseDiscoveryRuleStatement sqlStatement, final DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
sqlStatement.getNames().forEach(each -> dropRule(currentRuleConfig, each));
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 101682eb36b..ebd843d992c 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -36,6 +36,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.infra.util.expr.InlineExpressionParser;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
import org.apache.shardingsphere.readwritesplitting.api.strategy.DynamicReadwriteSplittingStrategyConfiguration;
@@ -59,6 +60,8 @@ import java.util.stream.Collectors;
*/
public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceContainedRule, StaticDataSourceContainedRule, ExportableRule, StorageConnectorReusableRule {
+ private final String databaseName;
+
@Getter
private final RuleConfiguration configuration;
@@ -66,7 +69,8 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon
private final Map<String, ReadwriteSplittingDataSourceRule> dataSourceRules;
- public ReadwriteSplittingRule(final ReadwriteSplittingRuleConfiguration ruleConfig, final Collection<ShardingSphereRule> builtRules) {
+ public ReadwriteSplittingRule(final String databaseName, final ReadwriteSplittingRuleConfiguration ruleConfig, final Collection<ShardingSphereRule> builtRules) {
+ this.databaseName = databaseName;
configuration = ruleConfig;
for (ReadwriteSplittingDataSourceRuleConfiguration dataSourceRuleConfiguration : ruleConfig.getDataSources()) {
if (ruleConfig.getLoadBalancers().containsKey(dataSourceRuleConfiguration.getLoadBalancerName())) {
@@ -176,6 +180,26 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(), DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus());
}
+ @Override
+ public void cleanStorageNodeDataSource(final String groupName) {
+ Preconditions.checkNotNull(dataSourceRules.get(groupName), String.format("`%s` group name not exist in database `%s`", groupName, databaseName));
+ deleteStorageNodeDataSources(dataSourceRules.get(groupName));
+ }
+
+ private void deleteStorageNodeDataSources(final ReadwriteSplittingDataSourceRule rule) {
+ if (rule.getReadwriteSplittingStrategy() instanceof DynamicReadwriteSplittingStrategy) {
+ return;
+ }
+ rule.getReadwriteSplittingStrategy().getReadDataSources().forEach(each -> new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, rule.getName(), each)));
+ }
+
+ @Override
+ public void cleanStorageNodeDataSources() {
+ for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
+ deleteStorageNodeDataSources(entry.getValue());
+ }
+ }
+
@Override
public Map<String, Object> getExportData() {
Map<String, Object> result = new HashMap<>(2, 1);
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
index f0d3afbcbf6..1fe9cf64e35 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
@@ -36,7 +36,7 @@ public final class ReadwriteSplittingRuleBuilder implements DatabaseRuleBuilder<
@Override
public ReadwriteSplittingRule build(final ReadwriteSplittingRuleConfiguration config, final String databaseName,
final Map<String, DataSource> dataSources, final Collection<ShardingSphereRule> builtRules, final InstanceContext instanceContext) {
- return new ReadwriteSplittingRule(config, builtRules);
+ return new ReadwriteSplittingRule(databaseName, config, builtRules);
}
@Override
diff --git a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
index 0ad3206292d..26a56b8dfbd 100644
--- a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
+++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
@@ -86,14 +86,14 @@ public final class ReadwriteSplittingSQLRouterTest {
@Before
public void setUp() {
- staticRule = new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
+ staticRule = new ReadwriteSplittingRule("logic_db", new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
new StaticReadwriteSplittingStrategyConfiguration(WRITE_DATASOURCE, Collections.singletonList(READ_DATASOURCE)), null, "")),
Collections.emptyMap()), Collections.emptyList());
sqlRouter = (ReadwriteSplittingSQLRouter) OrderedSPILoader.getServices(SQLRouter.class, Collections.singleton(staticRule)).get(staticRule);
DynamicDataSourceContainedRule dynamicDataSourceRule = mock(DynamicDataSourceContainedRule.class, RETURNS_DEEP_STUBS);
when(dynamicDataSourceRule.getPrimaryDataSourceName("readwrite_ds")).thenReturn(WRITE_DATASOURCE);
when(dynamicDataSourceRule.getReplicaDataSourceNames("readwrite_ds")).thenReturn(Collections.emptyList());
- dynamicRule = new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
+ dynamicRule = new ReadwriteSplittingRule("logic_db", new ReadwriteSplittingRuleConfiguration(Collections.singleton(new ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
new DynamicReadwriteSplittingStrategyConfiguration("readwrite_ds", "true"), "")), Collections.emptyMap()),
Collections.singleton(dynamicDataSourceRule));
dynamicSqlRouter = (ReadwriteSplittingSQLRouter) OrderedSPILoader.getServices(SQLRouter.class, Collections.singleton(dynamicRule)).get(dynamicRule);
diff --git a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
index a4c8ae40a45..bb117c99443 100644
--- a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
+++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
@@ -56,7 +56,7 @@ public final class ReadwriteSplittingRuleTest {
private ReadwriteSplittingRule createReadwriteSplittingRule() {
ReadwriteSplittingDataSourceRuleConfiguration config =
new ReadwriteSplittingDataSourceRuleConfiguration("readwrite", new StaticReadwriteSplittingStrategyConfiguration("write_ds", Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
- return new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration(
+ return new ReadwriteSplittingRule("logic_db", new ReadwriteSplittingRuleConfiguration(
Collections.singleton(config), Collections.singletonMap("random", new AlgorithmConfiguration("RANDOM", new Properties()))), Collections.emptyList());
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
index 11c8f301bc6..b0ae4078eef 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -125,6 +126,7 @@ public final class ShardingSphereMetaData {
globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> each.closeStaleResource(databaseName));
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(DynamicDataSourceContainedRule::closeAllHeartBeatJob);
+ database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
Optional.ofNullable(database.getResourceMetaData()).ifPresent(optional -> optional.getDataSources().values().forEach(each -> database.getResourceMetaData().close(each)));
}
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
index b12c8e6c044..728674bf615 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
@@ -31,4 +31,16 @@ public interface StaticDataSourceContainedRule extends ShardingSphereRule {
* @param event data source status changed event
*/
void updateStatus(DataSourceStatusChangedEvent event);
+
+ /**
+ * Clean single storage node data source.
+ *
+ * @param groupName group name
+ */
+ void cleanStorageNodeDataSource(String groupName);
+
+ /**
+ * Clean storage nodes data sources.
+ */
+ void cleanStorageNodeDataSources();
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
similarity index 62%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
copy to mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
index b12c8e6c044..010ed71d979 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.rule.identifier.type;
+package org.apache.shardingsphere.mode.metadata.storage.event;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
/**
- * Static data source contained rule.
+ * Storage node data source deleted event.
*/
-public interface StaticDataSourceContainedRule extends ShardingSphereRule {
+@RequiredArgsConstructor
+@Getter
+public final class StorageNodeDataSourceDeletedEvent {
- /**
- * Update data source status.
- *
- * @param event data source status changed event
- */
- void updateStatus(DataSourceStatusChangedEvent event);
+ private final QualifiedDatabase qualifiedDatabase;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
index ef9c5003a59..4a451545dd6 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
@@ -55,12 +55,12 @@ public final class StorageNode {
}
/**
- * Get storage node status path.
+ * Get storage node data source path.
*
* @param database cluster database
* @return status path of storage node
*/
- public static String getStatusPath(final QualifiedDatabase database) {
+ public static String getStorageNodeDataSourcePath(final QualifiedDatabase database) {
return String.join("/", getRootPath(), database.toString());
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
index 6548aa98c48..7aa3904c788 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
@@ -50,7 +51,7 @@ public final class StorageNodeStatusSubscriber {
*/
@Subscribe
public void update(final DataSourceDisabledEvent event) {
- repository.persist(StorageNode.getStatusPath(new QualifiedDatabase(event.getDatabaseName(), event.getGroupName(), event.getDataSourceName())),
+ repository.persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(event.getDatabaseName(), event.getGroupName(), event.getDataSourceName())),
YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(event.getStorageNodeDataSource())));
}
@@ -61,7 +62,17 @@ public final class StorageNodeStatusSubscriber {
*/
@Subscribe
public void update(final PrimaryDataSourceChangedEvent event) {
- repository.persist(StorageNode.getStatusPath(event.getQualifiedDatabase()),
+ repository.persist(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()),
YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
}
+
+ /**
+ * Delete storage node data source.
+ *
+ * @param event storage node data source deleted event
+ */
+ @Subscribe
+ public void delete(final StorageNodeDataSourceDeletedEvent event) {
+ repository.delete(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()));
+ }
}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
index 9571e4daf0c..f721e3db0aa 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,7 +52,7 @@ public final class StorageNodeStatusSubscriberTest {
StorageNodeDataSource storageNodeDataSource = new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED);
DataSourceDisabledEvent dataSourceDisabledEvent = new DataSourceDisabledEvent(databaseName, groupName, dataSourceName, storageNodeDataSource);
new StorageNodeStatusSubscriber(repository, eventBusContext).update(dataSourceDisabledEvent);
- verify(repository).persist(StorageNode.getStatusPath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
+ verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED))));
}
@@ -63,7 +64,7 @@ public final class StorageNodeStatusSubscriberTest {
StorageNodeDataSource storageNodeDataSource = new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED);
DataSourceDisabledEvent dataSourceDisabledEvent = new DataSourceDisabledEvent(databaseName, groupName, dataSourceName, storageNodeDataSource);
new StorageNodeStatusSubscriber(repository, eventBusContext).update(dataSourceDisabledEvent);
- verify(repository).persist(StorageNode.getStatusPath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
+ verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(storageNodeDataSource)));
}
@@ -74,7 +75,17 @@ public final class StorageNodeStatusSubscriberTest {
String dataSourceName = "replica_ds_0";
PrimaryDataSourceChangedEvent event = new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, dataSourceName));
new StorageNodeStatusSubscriber(repository, eventBusContext).update(event);
- verify(repository).persist(StorageNode.getStatusPath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
+ verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
}
+
+ @Test
+ public void assertDeleteStorageNodeDataSourceDataSourceState() {
+ String databaseName = "replica_query_db";
+ String groupName = "readwrite_ds";
+ String dataSourceName = "replica_ds_0";
+ StorageNodeDataSourceDeletedEvent event = new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, groupName, dataSourceName));
+ new StorageNodeStatusSubscriber(repository, eventBusContext).delete(event);
+ verify(repository).delete(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)));
+ }
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
index 9977705be9e..0eedb5077e9 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
@@ -169,7 +169,7 @@ public final class ImportDatabaseConfigurationUpdater implements RALUpdater<Impo
ReadwriteSplittingRuleConfiguration readwriteSplittingRuleConfig = new YamlReadwriteSplittingRuleConfigurationSwapper().swapToObject((YamlReadwriteSplittingRuleConfiguration) each);
readwriteSplittingRuleConfigurationImportChecker.check(database, readwriteSplittingRuleConfig);
ruleConfigs.add(readwriteSplittingRuleConfig);
- rules.add(new ReadwriteSplittingRule(readwriteSplittingRuleConfig, rules));
+ rules.add(new ReadwriteSplittingRule(databaseName, readwriteSplittingRuleConfig, rules));
} else if (each instanceof YamlDatabaseDiscoveryRuleConfiguration) {
DatabaseDiscoveryRuleConfiguration databaseDiscoveryRuleConfig = new YamlDatabaseDiscoveryRuleConfigurationSwapper().swapToObject((YamlDatabaseDiscoveryRuleConfiguration) each);
databaseDiscoveryRuleConfigurationImportChecker.check(database, databaseDiscoveryRuleConfig);
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index df4b5b33f3e..b98fab4185b 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule;
+import org.apache.shardingsphere.dbdiscovery.distsql.handler.update.DropDatabaseDiscoveryRuleStatementUpdater;
+import org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionAlterUpdater;
import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionCreateUpdater;
import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionDropUpdater;
@@ -24,6 +26,8 @@ import org.apache.shardingsphere.distsql.handler.update.RuleDefinitionUpdater;
import org.apache.shardingsphere.distsql.parser.statement.rdl.RuleDefinitionStatement;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -31,6 +35,8 @@ import org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.RDLBackendHan
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.readwritesplitting.distsql.handler.update.DropReadwriteSplittingRuleStatementUpdater;
+import org.apache.shardingsphere.readwritesplitting.distsql.parser.statement.DropReadwriteSplittingRuleStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.util.Collection;
@@ -85,7 +91,7 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
result.remove(currentRuleConfig);
result.add(processAlter(sqlStatement, (RuleDefinitionAlterUpdater) updater, currentRuleConfig));
} else if (updater instanceof RuleDefinitionDropUpdater) {
- processDrop(result, sqlStatement, (RuleDefinitionDropUpdater) updater, currentRuleConfig);
+ processDrop(database, result, sqlStatement, (RuleDefinitionDropUpdater) updater, currentRuleConfig);
} else {
throw new UnsupportedSQLOperationException(String.format("Cannot support RDL updater type `%s`", updater.getClass().getName()));
}
@@ -110,13 +116,22 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private void processDrop(final Collection<RuleConfiguration> configs, final T sqlStatement, final RuleDefinitionDropUpdater updater, final RuleConfiguration currentRuleConfig) {
+ private void processDrop(final ShardingSphereDatabase database, final Collection<RuleConfiguration> configs, final T sqlStatement,
+ final RuleDefinitionDropUpdater updater, final RuleConfiguration currentRuleConfig) {
if (!updater.hasAnyOneToBeDropped(sqlStatement, currentRuleConfig)) {
return;
}
if (updater.updateCurrentRuleConfiguration(sqlStatement, currentRuleConfig)) {
configs.remove(currentRuleConfig);
}
+ if (updater instanceof DropDatabaseDiscoveryRuleStatementUpdater) {
+ database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class)
+ .ifPresent(optional -> ((DropDatabaseDiscoveryRuleStatement) sqlStatement).getNames().forEach(optional::closeSingleHeartBeatJob));
+ }
+ if (updater instanceof DropReadwriteSplittingRuleStatementUpdater) {
+ database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)
+ .ifPresent(optional -> ((DropReadwriteSplittingRuleStatement) sqlStatement).getNames().forEach(optional::cleanStorageNodeDataSource));
+ }
}
private boolean getRefreshStatus(final SQLStatement sqlStatement, final RuleConfiguration currentRuleConfig, final RuleDefinitionUpdater<?, ?> updater) {