You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/11/08 00:17:11 UTC
[shardingsphere] branch master updated: Add ContextManagerSubscriberFacade (#21995)
This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f1d2d480f17 Add ContextManagerSubscriberFacade (#21995)
f1d2d480f17 is described below
commit f1d2d480f172fa78825d561661cba25113e66c50
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Nov 8 08:16:55 2022 +0800
Add ContextManagerSubscriberFacade (#21995)
* Add ResourceMetaDataCoordinator
* Add coordinator.subscriber package
* Add ContextManagerSubscriberFacade
* Add ContextManagerSubscriberFacade
---
.../cluster/ClusterContextManagerBuilder.java | 4 +-
.../coordinator/ContextManagerCoordinator.java | 363 ---------------------
.../subscriber/ConfigurationChangedSubscriber.java | 135 ++++++++
.../subscriber/ContextManagerSubscriberFacade.java | 36 ++
.../subscriber/DatabaseChangedSubscriber.java | 91 ++++++
.../subscriber/ProcessListChangedSubscriber.java | 116 +++++++
.../ResourceMetaDataChangedSubscriber.java | 103 ++++++
.../subscriber/StateChangedSubscriber.java | 140 ++++++++
.../ConfigurationChangedSubscriberTest.java} | 157 +--------
.../ProcessListChangedSubscriberTest.java | 143 ++++++++
.../ResourceMetaDataChangedSubscriberTest.java | 163 +++++++++
.../subscriber/StateChangedSubscriberTest.java | 184 +++++++++++
12 files changed, 1123 insertions(+), 512 deletions(-)
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index e3247de01ff..0e47566a4f4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.ContextManagerCoordinator;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberFacade;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -86,7 +86,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
final ContextManagerBuilderParameter parameter, final ContextManager contextManager) {
contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
- new ContextManagerCoordinator(persistService, registryCenter, contextManager);
+ new ContextManagerSubscriberFacade(persistService, registryCenter, contextManager);
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
deleted file mode 100644
index 8e8d76c3bbb..00000000000
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
-import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
-import org.apache.shardingsphere.mode.process.node.ProcessNode;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Context manager coordinator.
- */
-@SuppressWarnings("UnstableApiUsage")
-public final class ContextManagerCoordinator {
-
- private final MetaDataPersistService persistService;
-
- private final RegistryCenter registryCenter;
-
- private final ContextManager contextManager;
-
- public ContextManagerCoordinator(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManager contextManager) {
- this.persistService = persistService;
- this.registryCenter = registryCenter;
- this.contextManager = contextManager;
- contextManager.getInstanceContext().getEventBusContext().register(this);
- new ResourceMetaDataCoordinator(contextManager);
- disableDataSources();
- }
-
- /**
- * Renew rule configurations.
- *
- * @param event rule configurations changed event
- */
- @Subscribe
- public synchronized void renew(final RuleConfigurationsChangedEvent event) {
- if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
- contextManager.alterRuleConfiguration(event.getDatabaseName(), event.getRuleConfigurations());
- disableDataSources();
- }
- }
-
- /**
- * Renew data source configuration.
- *
- * @param event data source changed event.
- */
- @Subscribe
- public synchronized void renew(final DataSourceChangedEvent event) {
- if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
- contextManager.alterDataSourceConfiguration(event.getDatabaseName(), event.getDataSourcePropertiesMap());
- disableDataSources();
- }
- }
-
- /**
- * Renew disabled data source names.
- *
- * @param event Storage node changed event
- */
- @Subscribe
- public synchronized void renew(final StorageNodeChangedEvent event) {
- QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
- if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
- return;
- }
- Optional<ShardingSphereRule> dynamicDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
- .getRules().stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();
- if (dynamicDataSourceRule.isPresent()) {
- ((DynamicDataSourceContainedRule) dynamicDataSourceRule.get()).updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
- return;
- }
- Optional<ShardingSphereRule> staticDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
- .getRules().stream().filter(each -> each instanceof StaticDataSourceContainedRule).findFirst();
- staticDataSourceRule.ifPresent(optional -> ((StaticDataSourceContainedRule) optional)
- .updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
- DataSourceStateManager.getInstance().updateState(
- qualifiedDatabase.getDatabaseName(), qualifiedDatabase.getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
- }
-
- /**
- * Renew primary data source names.
- *
- * @param event primary state changed event
- */
- @Subscribe
- public synchronized void renew(final PrimaryStateChangedEvent event) {
- if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
- return;
- }
- QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
- contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
- .stream()
- .filter(each -> each instanceof DynamicDataSourceContainedRule)
- .forEach(each -> ((DynamicDataSourceContainedRule) each)
- .restartHeartBeatJob(new PrimaryDataSourceChangedEvent(qualifiedDatabase)));
- }
-
- /**
- * Renew global rule configurations.
- *
- * @param event global rule configurations changed event
- */
- @Subscribe
- public synchronized void renew(final GlobalRuleConfigurationsChangedEvent event) {
- contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
- disableDataSources();
- }
-
- /**
- * Renew instance status.
- *
- * @param event state event
- */
- @Subscribe
- public synchronized void renew(final StateEvent event) {
- contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
- }
-
- /**
- * Renew instance labels.
- *
- * @param event label event
- */
- @Subscribe
- public synchronized void renew(final LabelsEvent event) {
- // TODO labels may be empty
- contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
- }
-
- /**
- * Renew instance list.
- *
- * @param event compute node online event
- */
- @Subscribe
- public synchronized void renew(final InstanceOnlineEvent event) {
- contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
- }
-
- /**
- * Renew instance list.
- *
- * @param event compute node offline event
- */
- @Subscribe
- public synchronized void renew(final InstanceOfflineEvent event) {
- contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
- }
-
- /**
- * Renew with new database version.
- *
- * @param event database version changed event
- */
- @Subscribe
- public synchronized void renew(final DatabaseVersionChangedEvent event) {
- Map<String, DataSourceProperties> dataSourcePropertiesMap = persistService.getDataSourceService().load(event.getDatabaseName(), event.getActiveVersion());
- Collection<RuleConfiguration> ruleConfigs = persistService.getDatabaseRulePersistService().load(event.getDatabaseName(), event.getActiveVersion());
- contextManager.alterDataSourceAndRuleConfiguration(event.getDatabaseName(), dataSourcePropertiesMap, ruleConfigs);
- disableDataSources();
- }
-
- /**
- * Renew properties.
- *
- * @param event properties changed event
- */
- @Subscribe
- public synchronized void renew(final PropertiesChangedEvent event) {
- contextManager.alterProperties(event.getProps());
- }
-
- /**
- * Renew to persist ShardingSphere database data.
- *
- * @param event database data added event
- */
- @Subscribe
- public synchronized void renew(final DatabaseDataAddedEvent event) {
- contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
- }
-
- /**
- * Renew to delete ShardingSphere data database.
- *
- * @param event database delete event
- */
- @Subscribe
- public synchronized void renew(final DatabaseDataDeletedEvent event) {
- contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
- }
-
- /**
- * Renew to added ShardingSphere data schema.
- *
- * @param event schema added event
- */
- @Subscribe
- public synchronized void renew(final SchemaDataAddedEvent event) {
- contextManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
- }
-
- /**
- * Renew to delete ShardingSphere data schema.
- *
- * @param event schema delete event
- */
- @Subscribe
- public synchronized void renew(final SchemaDataDeletedEvent event) {
- contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
- }
-
- /**
- * Renew ShardingSphere data of the table.
- *
- * @param event table data changed event
- */
- @Subscribe
- public synchronized void renew(final TableDataChangedEvent event) {
- contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
- contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
- }
-
- /**
- * Trigger show process list.
- *
- * @param event show process list trigger event
- */
- @Subscribe
- public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
- if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
- return;
- }
- Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
- if (!processContexts.isEmpty()) {
- registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
- YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
- }
- registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
- }
-
- /**
- * Trigger show process list.
- *
- * @param event show process list trigger event
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void killProcessListId(final KillProcessListIdEvent event) throws SQLException {
- if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
- return;
- }
- Collection<Statement> statements = ShowProcessListManager.getInstance().getProcessStatement(event.getProcessListId());
- for (Statement statement : statements) {
- statement.cancel();
- }
- registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
- }
-
- /**
- * Complete unit show process list.
- *
- * @param event show process list unit complete event
- */
- @Subscribe
- public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
- ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
- if (null != simpleLock) {
- simpleLock.doNotify();
- }
- }
-
- /**
- * Complete unit kill process list id.
- *
- * @param event kill process list id unit complete event
- */
- @Subscribe
- public synchronized void completeUnitKillProcessListId(final KillProcessListIdUnitCompleteEvent event) {
- ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
- if (null != simpleLock) {
- simpleLock.doNotify();
- }
- }
-
- private void disableDataSources() {
- contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, value) -> value.getRuleMetaData().getRules().forEach(each -> {
- if (each instanceof StaticDataSourceContainedRule) {
- disableDataSources((StaticDataSourceContainedRule) each);
- }
- }));
- }
-
- private void disableDataSources(final StaticDataSourceContainedRule rule) {
- Map<String, StorageNodeDataSource> storageNodes = registryCenter.getStorageNodeStatusService().loadStorageNodes();
- Map<String, StorageNodeDataSource> disableDataSources = storageNodes.entrySet().stream().filter(entry -> StorageNodeStatus.isDisable(entry.getValue().getStatus()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- disableDataSources.forEach((key, value) -> rule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
- }
-}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
new file mode 100644
index 00000000000..b02dcea8072
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Configuration changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ConfigurationChangedSubscriber {
+
+ private final MetaDataPersistService persistService;
+
+ private final RegistryCenter registryCenter;
+
+ private final ContextManager contextManager;
+
+ public ConfigurationChangedSubscriber(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManager contextManager) {
+ this.persistService = persistService;
+ this.registryCenter = registryCenter;
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ disableDataSources();
+ }
+
+ /**
+ * Renew data source configuration.
+ *
+ * @param event data source changed event.
+ */
+ @Subscribe
+ public synchronized void renew(final DataSourceChangedEvent event) {
+ if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
+ contextManager.alterDataSourceConfiguration(event.getDatabaseName(), event.getDataSourcePropertiesMap());
+ disableDataSources();
+ }
+ }
+
+ /**
+ * Renew rule configurations.
+ *
+ * @param event rule configurations changed event
+ */
+ @Subscribe
+ public synchronized void renew(final RuleConfigurationsChangedEvent event) {
+ if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
+ contextManager.alterRuleConfiguration(event.getDatabaseName(), event.getRuleConfigurations());
+ disableDataSources();
+ }
+ }
+
+ /**
+ * Renew global rule configurations.
+ *
+ * @param event global rule configurations changed event
+ */
+ @Subscribe
+ public synchronized void renew(final GlobalRuleConfigurationsChangedEvent event) {
+ contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
+ disableDataSources();
+ }
+
+ /**
+ * Renew with new database version.
+ *
+ * @param event database version changed event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseVersionChangedEvent event) {
+ Map<String, DataSourceProperties> dataSourcePropertiesMap = persistService.getDataSourceService().load(event.getDatabaseName(), event.getActiveVersion());
+ Collection<RuleConfiguration> ruleConfigs = persistService.getDatabaseRulePersistService().load(event.getDatabaseName(), event.getActiveVersion());
+ contextManager.alterDataSourceAndRuleConfiguration(event.getDatabaseName(), dataSourcePropertiesMap, ruleConfigs);
+ disableDataSources();
+ }
+
+ /**
+ * Renew properties.
+ *
+ * @param event properties changed event
+ */
+ @Subscribe
+ public synchronized void renew(final PropertiesChangedEvent event) {
+ contextManager.alterProperties(event.getProps());
+ }
+
+ private void disableDataSources() {
+ contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, value) -> value.getRuleMetaData().getRules().forEach(each -> {
+ if (each instanceof StaticDataSourceContainedRule) {
+ disableDataSources((StaticDataSourceContainedRule) each);
+ }
+ }));
+ }
+
+ private void disableDataSources(final StaticDataSourceContainedRule rule) {
+ Map<String, StorageNodeDataSource> storageNodes = registryCenter.getStorageNodeStatusService().loadStorageNodes();
+ Map<String, StorageNodeDataSource> disableDataSources = storageNodes.entrySet().stream().filter(entry -> StorageNodeStatus.isDisable(entry.getValue().getStatus()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ disableDataSources.forEach((key, value) -> rule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
new file mode 100644
index 00000000000..783d4301ca5
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+
+/**
+ * Context manager subscriber facade.
+ */
+public final class ContextManagerSubscriberFacade {
+
+ public ContextManagerSubscriberFacade(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManager contextManager) {
+ new ConfigurationChangedSubscriber(persistService, registryCenter, contextManager);
+ new ResourceMetaDataChangedSubscriber(contextManager);
+ new DatabaseChangedSubscriber(contextManager);
+ new StateChangedSubscriber(registryCenter, contextManager);
+ new ProcessListChangedSubscriber(registryCenter, contextManager);
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
new file mode 100644
index 00000000000..8e89618a5f8
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
+
+/**
+ * Database changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class DatabaseChangedSubscriber {
+
+ private final ContextManager contextManager;
+
+ public DatabaseChangedSubscriber(final ContextManager contextManager) {
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ }
+
+ /**
+ * Renew to persist ShardingSphere database data.
+ *
+ * @param event database data added event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseDataAddedEvent event) {
+ contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
+ }
+
+ /**
+ * Renew to delete ShardingSphere data database.
+ *
+ * @param event database delete event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseDataDeletedEvent event) {
+ contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
+ }
+
+ /**
+ * Renew to added ShardingSphere data schema.
+ *
+ * @param event schema added event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDataAddedEvent event) {
+ contextManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew to delete ShardingSphere data schema.
+ *
+ * @param event schema delete event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDataDeletedEvent event) {
+ contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew ShardingSphere data of the table.
+ *
+ * @param event table data changed event
+ */
+ @Subscribe
+ public synchronized void renew(final TableDataChangedEvent event) {
+ contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
+ contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
new file mode 100644
index 00000000000..1e1c774b078
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
+import org.apache.shardingsphere.mode.process.node.ProcessNode;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+/**
+ * Process list changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ProcessListChangedSubscriber {
+
+ private final RegistryCenter registryCenter;
+
+ private final ContextManager contextManager;
+
+ public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
+ this.registryCenter = registryCenter;
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ }
+
+ /**
+ * Trigger show process list.
+ *
+ * @param event show process list trigger event
+ */
+ @Subscribe
+ public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
+ if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+ return;
+ }
+ Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
+ if (!processContexts.isEmpty()) {
+ registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
+ YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
+ }
+ registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
+ }
+
+ /**
+ * Trigger show process list.
+ *
+ * @param event show process list trigger event
+ * @throws SQLException SQL exception
+ */
+ @Subscribe
+ public synchronized void killProcessListId(final KillProcessListIdEvent event) throws SQLException {
+ if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+ return;
+ }
+ Collection<Statement> statements = ShowProcessListManager.getInstance().getProcessStatement(event.getProcessListId());
+ for (Statement statement : statements) {
+ statement.cancel();
+ }
+ registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
+ }
+
+ /**
+ * Complete unit show process list.
+ *
+ * @param event show process list unit complete event
+ */
+ @Subscribe
+ public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
+ ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
+ if (null != simpleLock) {
+ simpleLock.doNotify();
+ }
+ }
+
+ /**
+ * Complete unit kill process list id.
+ *
+ * @param event kill process list id unit complete event
+ */
+ @Subscribe
+ public synchronized void completeUnitKillProcessListId(final KillProcessListIdUnitCompleteEvent event) {
+ ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
+ if (null != simpleLock) {
+ simpleLock.doNotify();
+ }
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
new file mode 100644
index 00000000000..340cb6ba862
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+
+/**
+ * Resource meta data changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ResourceMetaDataChangedSubscriber {
+
+ private final ContextManager contextManager;
+
+ public ResourceMetaDataChangedSubscriber(final ContextManager contextManager) {
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ }
+
+ /**
+ * Renew to persist meta data.
+ *
+ * @param event database added event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseAddedEvent event) {
+ contextManager.addDatabase(event.getDatabaseName());
+ }
+
+ /**
+ * Renew to delete database.
+ *
+ * @param event database delete event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseDeletedEvent event) {
+ contextManager.dropDatabase(event.getDatabaseName());
+ }
+
+ /**
+ * Renew to added schema.
+ *
+ * @param event schema added event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaAddedEvent event) {
+ contextManager.addSchema(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew to delete schema.
+ *
+ * @param event schema delete event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDeletedEvent event) {
+ contextManager.dropSchema(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew meta data of the table.
+ *
+ * @param event table meta data changed event
+ */
+ @Subscribe
+ public synchronized void renew(final TableMetaDataChangedEvent event) {
+ contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableMetaData(), null);
+ contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable(), null);
+ }
+
+ /**
+ * Renew meta data of the view.
+ *
+ * @param event view meta data changed event
+ */
+ @Subscribe
+ public synchronized void renew(final ViewMetaDataChangedEvent event) {
+ contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), null, event.getChangedViewMetaData());
+ contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), null, event.getDeletedView());
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
new file mode 100644
index 00000000000..794e189ff51
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+
+import java.util.Optional;
+
+/**
+ * State changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class StateChangedSubscriber {
+
+ private final RegistryCenter registryCenter;
+
+ private final ContextManager contextManager;
+
+ public StateChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
+ this.registryCenter = registryCenter;
+ this.contextManager = contextManager;
+ contextManager.getInstanceContext().getEventBusContext().register(this);
+ }
+
+ /**
+ * Renew disabled data source names.
+ *
+ * @param event Storage node changed event
+ */
+ @Subscribe
+ public synchronized void renew(final StorageNodeChangedEvent event) {
+ QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+ if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+ return;
+ }
+ Optional<ShardingSphereRule> dynamicDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+ .getRules().stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();
+ if (dynamicDataSourceRule.isPresent()) {
+ ((DynamicDataSourceContainedRule) dynamicDataSourceRule.get()).updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+ return;
+ }
+ Optional<ShardingSphereRule> staticDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+ .getRules().stream().filter(each -> each instanceof StaticDataSourceContainedRule).findFirst();
+ staticDataSourceRule.ifPresent(optional -> ((StaticDataSourceContainedRule) optional)
+ .updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+ DataSourceStateManager.getInstance().updateState(
+ qualifiedDatabase.getDatabaseName(), qualifiedDatabase.getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
+ }
+
+ /**
+ * Renew primary data source names.
+ *
+ * @param event primary state changed event
+ */
+ @Subscribe
+ public synchronized void renew(final PrimaryStateChangedEvent event) {
+ if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+ return;
+ }
+ QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+ contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
+ .stream()
+ .filter(each -> each instanceof DynamicDataSourceContainedRule)
+ .forEach(each -> ((DynamicDataSourceContainedRule) each)
+ .restartHeartBeatJob(new PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+ }
+
+ /**
+ * Renew instance status.
+ *
+ * @param event state event
+ */
+ @Subscribe
+ public synchronized void renew(final StateEvent event) {
+ contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
+ }
+
+ /**
+ * Renew instance labels.
+ *
+ * @param event label event
+ */
+ @Subscribe
+ public synchronized void renew(final LabelsEvent event) {
+ // TODO labels may be empty
+ contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
+ }
+
+ /**
+ * Renew instance list.
+ *
+ * @param event compute node online event
+ */
+ @Subscribe
+ public synchronized void renew(final InstanceOnlineEvent event) {
+ contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+ }
+
+ /**
+ * Renew instance list.
+ *
+ * @param event compute node offline event
+ */
+ @Subscribe
+ public synchronized void renew(final InstanceOfflineEvent event) {
+ contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
+ }
+}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
similarity index 57%
rename from mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
rename to mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
index a87baf3db0b..a9df46a4e70 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
@@ -27,48 +27,27 @@ import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
-import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.sqltranslator.rule.SQLTranslatorRule;
@@ -78,7 +57,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
-import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -89,26 +67,20 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class ContextManagerCoordinatorTest {
+public final class ConfigurationChangedSubscriberTest {
- private ContextManagerCoordinator coordinator;
+ private ConfigurationChangedSubscriber subscriber;
private ContextManager contextManager;
@@ -126,7 +98,7 @@ public final class ContextManagerCoordinatorTest {
contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
- coordinator = new ContextManagerCoordinator(persistService, new RegistryCenter(mock(ClusterPersistRepository.class),
+ subscriber = new ConfigurationChangedSubscriber(persistService, new RegistryCenter(mock(ClusterPersistRepository.class),
new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
}
@@ -155,25 +127,14 @@ public final class ContextManagerCoordinatorTest {
public void assertRenewForRuleConfigurationsChanged() {
when(persistService.getMetaDataVersionPersistService().isActiveVersion("db", "0")).thenReturn(true);
assertThat(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"), is(database));
- coordinator.renew(new RuleConfigurationsChangedEvent("db", "0", Collections.emptyList()));
+ subscriber.renew(new RuleConfigurationsChangedEvent("db", "0", Collections.emptyList()));
assertThat(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"), not(database));
}
- @Test
- public void assertRenewForDisableStateChanged() {
- StaticDataSourceContainedRule staticDataSourceRule = mock(StaticDataSourceContainedRule.class);
- when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
- StorageNodeChangedEvent event = new StorageNodeChangedEvent(new QualifiedDatabase("db.readwrite_ds.ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
- coordinator.renew(event);
- verify(staticDataSourceRule).updateStatus(argThat(
- (ArgumentMatcher<StorageNodeDataSourceChangedEvent>) argumentEvent -> Objects.equals(event.getQualifiedDatabase(), argumentEvent.getQualifiedDatabase())
- && Objects.equals(event.getDataSource(), argumentEvent.getDataSource())));
- }
-
@Test
public void assertRenewForDataSourceChanged() {
when(persistService.getMetaDataVersionPersistService().isActiveVersion("db", "0")).thenReturn(true);
- coordinator.renew(new DataSourceChangedEvent("db", "0", createChangedDataSourcePropertiesMap()));
+ subscriber.renew(new DataSourceChangedEvent("db", "0", createChangedDataSourcePropertiesMap()));
assertTrue(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getDataSources().containsKey("ds_2"));
}
@@ -189,7 +150,7 @@ public final class ContextManagerCoordinatorTest {
@Test
public void assertRenewForGlobalRuleConfigurationsChanged() {
GlobalRuleConfigurationsChangedEvent event = new GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
- coordinator.renew(event);
+ subscriber.renew(event);
assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), not(globalRuleMetaData));
assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().size(), is(3));
assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().stream().filter(each -> each instanceof AuthorityRule).count(), is(1L));
@@ -209,51 +170,12 @@ public final class ContextManagerCoordinatorTest {
return result;
}
- @Test
- public void assertRenewPrimaryDataSourceName() {
- Collection<ShardingSphereRule> rules = new LinkedList<>();
- DynamicDataSourceContainedRule dynamicDataSourceRule = mock(DynamicDataSourceContainedRule.class);
- rules.add(dynamicDataSourceRule);
- ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
- ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
- when(database.getRuleMetaData()).thenReturn(ruleMetaData);
- contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database);
- PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
- coordinator.renew(mockPrimaryStateChangedEvent);
- verify(dynamicDataSourceRule).restartHeartBeatJob(any());
- }
-
- @Test
- public void assertRenewInstanceStatus() {
- Collection<String> testStates = new LinkedList<>();
- testStates.add(StateType.OK.name());
- StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), testStates);
- coordinator.renew(mockStateEvent);
- assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.OK));
- testStates.add(StateType.CIRCUIT_BREAK.name());
- coordinator.renew(mockStateEvent);
- assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
- }
-
- @Test
- public void assertRenewInstanceLabels() {
- Collection<String> labels = Collections.singleton("test");
- coordinator.renew(new LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), labels));
- assertThat(contextManager.getInstanceContext().getInstance().getLabels(), is(labels));
- }
-
- @Test
- public void assertRenewInstanceOfflineEvent() {
- coordinator.renew(new InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
- assertThat(((ProxyInstanceMetaData) contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), is(3307));
- }
-
@Test
public void assertRenewDatabaseVersionChangedEvent() {
when(persistService.getDataSourceService().load("db", "1")).thenReturn(getVersionChangedDataSourcePropertiesMap());
when(persistService.getDatabaseRulePersistService().load("db", "1")).thenReturn(Collections.emptyList());
Map<String, DataSource> dataSourceMap = initContextManager();
- coordinator.renew(new DatabaseVersionChangedEvent("db", "1"));
+ subscriber.renew(new DatabaseVersionChangedEvent("db", "1"));
assertThat(contextManager.getDataSourceMap("db").get("ds_0"), is(dataSourceMap.get("ds_0")));
assertNotNull(contextManager.getDataSourceMap("db").get("ds_1"));
assertThat(DataSourcePropertiesCreator.create(getChangedDataSource()), is(DataSourcePropertiesCreator.create(contextManager.getDataSourceMap("db").get("ds_1"))));
@@ -261,73 +183,14 @@ public final class ContextManagerCoordinatorTest {
assertThat(DataSourcePropertiesCreator.create(new MockedDataSource()), is(DataSourcePropertiesCreator.create(contextManager.getDataSourceMap("db").get("primary_ds"))));
}
- @Test
- public void assertRenewInstanceOnlineEvent() {
- InstanceMetaData instanceMetaData1 = new ProxyInstanceMetaData("foo_instance_3307", 3307);
- InstanceOnlineEvent instanceOnlineEvent1 = new InstanceOnlineEvent(instanceMetaData1);
- coordinator.renew(instanceOnlineEvent1);
- assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(1));
- assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(), is(instanceMetaData1));
- InstanceMetaData instanceMetaData2 = new ProxyInstanceMetaData("foo_instance_3308", 3308);
- InstanceOnlineEvent instanceOnlineEvent2 = new InstanceOnlineEvent(instanceMetaData2);
- coordinator.renew(instanceOnlineEvent2);
- assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
- assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData2));
- coordinator.renew(instanceOnlineEvent1);
- assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
- assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData1));
- }
-
@Test
public void assertRenewProperties() {
Properties props = new Properties();
props.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(), Boolean.TRUE.toString());
- coordinator.renew(new PropertiesChangedEvent(props));
+ subscriber.renew(new PropertiesChangedEvent(props));
assertThat(contextManager.getMetaDataContexts().getMetaData().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()), is(Boolean.TRUE.toString()));
}
- @Test
- public void assertCompleteUnitShowProcessList() {
- String processListId = "foo_process_id";
- ShowProcessListSimpleLock lock = new ShowProcessListSimpleLock();
- ShowProcessListManager.getInstance().getLocks().put(processListId, lock);
- long startTime = System.currentTimeMillis();
- ExecutorService executorService = Executors.newFixedThreadPool(1);
- executorService.submit(() -> {
- try {
- Thread.sleep(50L);
- } catch (final InterruptedException ignored) {
- }
- coordinator.completeUnitShowProcessList(new ShowProcessListUnitCompleteEvent(processListId));
- });
- lockAndAwaitDefaultTime(lock);
- long currentTime = System.currentTimeMillis();
- assertTrue(currentTime >= startTime + 50L);
- assertTrue(currentTime <= startTime + 5000L);
- ShowProcessListManager.getInstance().getLocks().remove(processListId);
- }
-
- @Test
- public void assertTriggerShowProcessList() throws NoSuchFieldException, IllegalAccessException {
- String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
- ShowProcessListManager.getInstance().putProcessContext("foo_execution_id", mock(ExecuteProcessContext.class));
- String processListId = "foo_process_id";
- coordinator.triggerShowProcessList(new ShowProcessListTriggerEvent(instanceId, processListId));
- ClusterPersistRepository repository = ReflectionUtil.getFieldValue(coordinator, "registryCenter", RegistryCenter.class).getRepository();
- verify(repository).persist("/execution_nodes/foo_process_id/" + instanceId,
- "contexts:" + System.lineSeparator() + "- startTimeMillis: 0" + System.lineSeparator());
- verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_process_id");
- }
-
- private void lockAndAwaitDefaultTime(final ShowProcessListSimpleLock lock) {
- lock.lock();
- try {
- lock.awaitDefaultTime();
- } finally {
- lock.unlock();
- }
- }
-
private Map<String, DataSource> initContextManager() {
Map<String, DataSource> result = getDataSourceMap();
ShardingSphereResourceMetaData resourceMetaData = new ShardingSphereResourceMetaData("sharding_db", result);
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
new file mode 100644
index 00000000000..b7cab8eddf6
--- /dev/null
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ProcessListChangedSubscriberTest {
+
+ private ProcessListChangedSubscriber subscriber;
+
+ private ContextManager contextManager;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private ShardingSphereDatabase database;
+
+ @Before
+ public void setUp() throws SQLException {
+ contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+ contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
+ contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
+ subscriber = new ProcessListChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
+ }
+
+ private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
+ ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+ InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+ return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyList(), new Properties(), Collections.emptyList(), instanceMetaData, false);
+ }
+
+ private Map<String, ShardingSphereDatabase> createDatabases() {
+ when(database.getResourceMetaData().getDataSources()).thenReturn(new LinkedHashMap<>());
+ when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
+ when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
+ when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+ when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+ when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
+ when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+ when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+ Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+ result.put("db", database);
+ return result;
+ }
+
+ @Test
+ public void assertCompleteUnitShowProcessList() {
+ String processListId = "foo_process_id";
+ ShowProcessListSimpleLock lock = new ShowProcessListSimpleLock();
+ ShowProcessListManager.getInstance().getLocks().put(processListId, lock);
+ long startTime = System.currentTimeMillis();
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException ignored) {
+ }
+ subscriber.completeUnitShowProcessList(new ShowProcessListUnitCompleteEvent(processListId));
+ });
+ lockAndAwaitDefaultTime(lock);
+ long currentTime = System.currentTimeMillis();
+ assertTrue(currentTime >= startTime + 50L);
+ assertTrue(currentTime <= startTime + 5000L);
+ ShowProcessListManager.getInstance().getLocks().remove(processListId);
+ }
+
+ @Test
+ public void assertTriggerShowProcessList() throws NoSuchFieldException, IllegalAccessException {
+ String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
+ ShowProcessListManager.getInstance().putProcessContext("foo_execution_id", mock(ExecuteProcessContext.class));
+ String processListId = "foo_process_id";
+ subscriber.triggerShowProcessList(new ShowProcessListTriggerEvent(instanceId, processListId));
+ ClusterPersistRepository repository = ReflectionUtil.getFieldValue(subscriber, "registryCenter", RegistryCenter.class).getRepository();
+ verify(repository).persist("/execution_nodes/foo_process_id/" + instanceId,
+ "contexts:" + System.lineSeparator() + "- startTimeMillis: 0" + System.lineSeparator());
+ verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_process_id");
+ }
+
+ private void lockAndAwaitDefaultTime(final ShowProcessListSimpleLock lock) {
+ lock.lock();
+ try {
+ lock.awaitDefaultTime();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
new file mode 100644
index 00000000000..43097de6d25
--- /dev/null
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.test.mock.MockedDataSource;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ResourceMetaDataChangedSubscriberTest {
+
+ private ResourceMetaDataChangedSubscriber subscriber;
+
+ private ContextManager contextManager;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private MetaDataPersistService persistService;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private ShardingSphereDatabase database;
+
+ @Before
+ public void setUp() throws SQLException {
+ contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+ contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
+ contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
+ subscriber = new ResourceMetaDataChangedSubscriber(contextManager);
+ }
+
+ private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
+ ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+ InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+ return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyList(), new Properties(), Collections.emptyList(), instanceMetaData, false);
+ }
+
+ private Map<String, ShardingSphereDatabase> createDatabases() {
+ when(database.getName()).thenReturn("db");
+ when(database.getResourceMetaData().getDataSources()).thenReturn(new LinkedHashMap<>());
+ when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
+ when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
+ when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+ when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+ when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
+ when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+ when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+ Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+ result.put("db", database);
+ return result;
+ }
+
+ @Test
+ public void assertRenewForDatabaseAdded() {
+ when(persistService.getDataSourceService().load("db_added")).thenReturn(createDataSourcePropertiesMap());
+ when(persistService.getDatabaseRulePersistService().load("db_added")).thenReturn(Collections.emptyList());
+ subscriber.renew(new DatabaseAddedEvent("db_added"));
+ assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db_added").getResourceMetaData().getDataSources());
+ }
+
+ private Map<String, DataSourceProperties> createDataSourcePropertiesMap() {
+ MockedDataSource dataSource = new MockedDataSource();
+ Map<String, DataSourceProperties> result = new LinkedHashMap<>(3, 1);
+ result.put("primary_ds", DataSourcePropertiesCreator.create(dataSource));
+ result.put("replica_ds_0", DataSourcePropertiesCreator.create(dataSource));
+ result.put("replica_ds_1", DataSourcePropertiesCreator.create(dataSource));
+ return result;
+ }
+
+ @Test
+ public void assertRenewForDatabaseDeleted() {
+ subscriber.renew(new DatabaseDeletedEvent("db"));
+ assertNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"));
+ }
+
+ @Test
+ public void assertRenewForSchemaAdded() {
+ subscriber.renew(new SchemaAddedEvent("db", "foo_schema"));
+ verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).putSchema(argThat(argument -> argument.equals("foo_schema")), any(ShardingSphereSchema.class));
+ }
+
+ @Test
+ public void assertRenewForSchemaDeleted() {
+ when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("foo_schema")).thenReturn(true);
+ subscriber.renew(new SchemaDeletedEvent("db", "foo_schema"));
+ verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).removeSchema("foo_schema");
+ }
+
+ @Test
+ public void assertRenewForTableMetaDataChangedChanged() {
+ when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("db")).thenReturn(true);
+ ShardingSphereTable changedTableMetaData = new ShardingSphereTable("t_order", Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
+ TableMetaDataChangedEvent event = new TableMetaDataChangedEvent("db", "db", changedTableMetaData, null);
+ subscriber.renew(event);
+ verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).putTable("t_order", event.getChangedTableMetaData());
+ }
+
+ @Test
+ public void assertRenewForViewMetaDataChanged() {
+ when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("db")).thenReturn(true);
+ ShardingSphereView changedViewMetaData = new ShardingSphereView("t_order_view", "");
+ ViewMetaDataChangedEvent event = new ViewMetaDataChangedEvent("db", "db", changedViewMetaData, null);
+ subscriber.renew(event);
+ verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).putView("t_order_view", event.getChangedViewMetaData());
+ }
+}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
new file mode 100644
index 00000000000..ba5f3eecbb8
--- /dev/null
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.infra.state.StateType;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class StateChangedSubscriberTest {
+
+ private StateChangedSubscriber subscriber;
+
+ private ContextManager contextManager;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private ShardingSphereDatabase database;
+
+ @Before
+ public void setUp() throws SQLException {
+ contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+ contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
+ contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
+ subscriber = new StateChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class),
+ new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
+ }
+
+ private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
+ ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+ InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+ return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyList(), new Properties(), Collections.emptyList(), instanceMetaData, false);
+ }
+
+ private Map<String, ShardingSphereDatabase> createDatabases() {
+ when(database.getResourceMetaData().getDataSources()).thenReturn(new LinkedHashMap<>());
+ when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
+ when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
+ when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+ when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+ when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
+ when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+ when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+ Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+ result.put("db", database);
+ return result;
+ }
+
+ @Test
+ public void assertRenewForDisableStateChanged() {
+ StaticDataSourceContainedRule staticDataSourceRule = mock(StaticDataSourceContainedRule.class);
+ when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
+ StorageNodeChangedEvent event = new StorageNodeChangedEvent(new QualifiedDatabase("db.readwrite_ds.ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
+ subscriber.renew(event);
+ verify(staticDataSourceRule).updateStatus(argThat(
+ (ArgumentMatcher<StorageNodeDataSourceChangedEvent>) argumentEvent -> Objects.equals(event.getQualifiedDatabase(), argumentEvent.getQualifiedDatabase())
+ && Objects.equals(event.getDataSource(), argumentEvent.getDataSource())));
+ }
+
+ @Test
+ public void assertRenewPrimaryDataSourceName() {
+ Collection<ShardingSphereRule> rules = new LinkedList<>();
+ DynamicDataSourceContainedRule dynamicDataSourceRule = mock(DynamicDataSourceContainedRule.class);
+ rules.add(dynamicDataSourceRule);
+ ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+ when(database.getRuleMetaData()).thenReturn(ruleMetaData);
+ contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database);
+ PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
+ subscriber.renew(mockPrimaryStateChangedEvent);
+ verify(dynamicDataSourceRule).restartHeartBeatJob(any());
+ }
+
+ @Test
+ public void assertRenewInstanceStatus() {
+ Collection<String> testStates = new LinkedList<>();
+ testStates.add(StateType.OK.name());
+ StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), testStates);
+ subscriber.renew(mockStateEvent);
+ assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.OK));
+ testStates.add(StateType.CIRCUIT_BREAK.name());
+ subscriber.renew(mockStateEvent);
+ assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
+ }
+
+ @Test
+ public void assertRenewInstanceLabels() {
+ Collection<String> labels = Collections.singleton("test");
+ subscriber.renew(new LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), labels));
+ assertThat(contextManager.getInstanceContext().getInstance().getLabels(), is(labels));
+ }
+
+ @Test
+ public void assertRenewInstanceOfflineEvent() {
+ subscriber.renew(new InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
+ assertThat(((ProxyInstanceMetaData) contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), is(3307));
+ }
+
+ @Test
+ public void assertRenewInstanceOnlineEvent() {
+ InstanceMetaData instanceMetaData1 = new ProxyInstanceMetaData("foo_instance_3307", 3307);
+ InstanceOnlineEvent instanceOnlineEvent1 = new InstanceOnlineEvent(instanceMetaData1);
+ subscriber.renew(instanceOnlineEvent1);
+ assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(1));
+ assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(), is(instanceMetaData1));
+ InstanceMetaData instanceMetaData2 = new ProxyInstanceMetaData("foo_instance_3308", 3308);
+ InstanceOnlineEvent instanceOnlineEvent2 = new InstanceOnlineEvent(instanceMetaData2);
+ subscriber.renew(instanceOnlineEvent2);
+ assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
+ assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData2));
+ subscriber.renew(instanceOnlineEvent1);
+ assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
+ assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData1));
+ }
+}