You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/11/08 00:17:11 UTC

[shardingsphere] branch master updated: Add ContextManagerSubscriberFacade (#21995)

This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f1d2d480f17 Add ContextManagerSubscriberFacade (#21995)
f1d2d480f17 is described below

commit f1d2d480f172fa78825d561661cba25113e66c50
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Nov 8 08:16:55 2022 +0800

    Add ContextManagerSubscriberFacade (#21995)
    
    * Add ResourceMetaDataCoordinator
    
    * Add coordinator.subscriber package
    
    * Add ContextManagerSubscriberFacade
    
    * Add ContextManagerSubscriberFacade
---
 .../cluster/ClusterContextManagerBuilder.java      |   4 +-
 .../coordinator/ContextManagerCoordinator.java     | 363 ---------------------
 .../subscriber/ConfigurationChangedSubscriber.java | 135 ++++++++
 .../subscriber/ContextManagerSubscriberFacade.java |  36 ++
 .../subscriber/DatabaseChangedSubscriber.java      |  91 ++++++
 .../subscriber/ProcessListChangedSubscriber.java   | 116 +++++++
 .../ResourceMetaDataChangedSubscriber.java         | 103 ++++++
 .../subscriber/StateChangedSubscriber.java         | 140 ++++++++
 .../ConfigurationChangedSubscriberTest.java}       | 157 +--------
 .../ProcessListChangedSubscriberTest.java          | 143 ++++++++
 .../ResourceMetaDataChangedSubscriberTest.java     | 163 +++++++++
 .../subscriber/StateChangedSubscriberTest.java     | 184 +++++++++++
 12 files changed, 1123 insertions(+), 512 deletions(-)

diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index e3247de01ff..0e47566a4f4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.ContextManagerCoordinator;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberFacade;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -86,7 +86,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
                                 final ContextManagerBuilderParameter parameter, final ContextManager contextManager) {
         contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
         contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
-        new ContextManagerCoordinator(persistService, registryCenter, contextManager);
+        new ContextManagerSubscriberFacade(persistService, registryCenter, contextManager);
         registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
     }
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
deleted file mode 100644
index 8e8d76c3bbb..00000000000
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
-import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
-import org.apache.shardingsphere.mode.process.node.ProcessNode;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Context manager coordinator.
- */
-@SuppressWarnings("UnstableApiUsage")
-public final class ContextManagerCoordinator {
-    
-    private final MetaDataPersistService persistService;
-    
-    private final RegistryCenter registryCenter;
-    
-    private final ContextManager contextManager;
-    
-    public ContextManagerCoordinator(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManager contextManager) {
-        this.persistService = persistService;
-        this.registryCenter = registryCenter;
-        this.contextManager = contextManager;
-        contextManager.getInstanceContext().getEventBusContext().register(this);
-        new ResourceMetaDataCoordinator(contextManager);
-        disableDataSources();
-    }
-    
-    /**
-     * Renew rule configurations.
-     *
-     * @param event rule configurations changed event
-     */
-    @Subscribe
-    public synchronized void renew(final RuleConfigurationsChangedEvent event) {
-        if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
-            contextManager.alterRuleConfiguration(event.getDatabaseName(), event.getRuleConfigurations());
-            disableDataSources();
-        }
-    }
-    
-    /**
-     * Renew data source configuration.
-     *
-     * @param event data source changed event.
-     */
-    @Subscribe
-    public synchronized void renew(final DataSourceChangedEvent event) {
-        if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
-            contextManager.alterDataSourceConfiguration(event.getDatabaseName(), event.getDataSourcePropertiesMap());
-            disableDataSources();
-        }
-    }
-    
-    /**
-     * Renew disabled data source names.
-     *
-     * @param event Storage node changed event
-     */
-    @Subscribe
-    public synchronized void renew(final StorageNodeChangedEvent event) {
-        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
-            return;
-        }
-        Optional<ShardingSphereRule> dynamicDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
-                .getRules().stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();
-        if (dynamicDataSourceRule.isPresent()) {
-            ((DynamicDataSourceContainedRule) dynamicDataSourceRule.get()).updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
-            return;
-        }
-        Optional<ShardingSphereRule> staticDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
-                .getRules().stream().filter(each -> each instanceof StaticDataSourceContainedRule).findFirst();
-        staticDataSourceRule.ifPresent(optional -> ((StaticDataSourceContainedRule) optional)
-                .updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
-        DataSourceStateManager.getInstance().updateState(
-                qualifiedDatabase.getDatabaseName(), qualifiedDatabase.getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
-    }
-    
-    /**
-     * Renew primary data source names.
-     *
-     * @param event primary state changed event
-     */
-    @Subscribe
-    public synchronized void renew(final PrimaryStateChangedEvent event) {
-        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
-            return;
-        }
-        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-        contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
-                .stream()
-                .filter(each -> each instanceof DynamicDataSourceContainedRule)
-                .forEach(each -> ((DynamicDataSourceContainedRule) each)
-                        .restartHeartBeatJob(new PrimaryDataSourceChangedEvent(qualifiedDatabase)));
-    }
-    
-    /**
-     * Renew global rule configurations.
-     *
-     * @param event global rule configurations changed event
-     */
-    @Subscribe
-    public synchronized void renew(final GlobalRuleConfigurationsChangedEvent event) {
-        contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
-        disableDataSources();
-    }
-    
-    /**
-     * Renew instance status.
-     *
-     * @param event state event
-     */
-    @Subscribe
-    public synchronized void renew(final StateEvent event) {
-        contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
-    }
-    
-    /**
-     * Renew instance labels.
-     * 
-     * @param event label event
-     */
-    @Subscribe
-    public synchronized void renew(final LabelsEvent event) {
-        // TODO labels may be empty
-        contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
-    }
-    
-    /**
-     * Renew instance list.
-     *
-     * @param event compute node online event
-     */
-    @Subscribe
-    public synchronized void renew(final InstanceOnlineEvent event) {
-        contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
-    }
-    
-    /**
-     * Renew instance list.
-     *
-     * @param event compute node offline event
-     */
-    @Subscribe
-    public synchronized void renew(final InstanceOfflineEvent event) {
-        contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
-    }
-    
-    /**
-     * Renew with new database version.
-     *
-     * @param event database version changed event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseVersionChangedEvent event) {
-        Map<String, DataSourceProperties> dataSourcePropertiesMap = persistService.getDataSourceService().load(event.getDatabaseName(), event.getActiveVersion());
-        Collection<RuleConfiguration> ruleConfigs = persistService.getDatabaseRulePersistService().load(event.getDatabaseName(), event.getActiveVersion());
-        contextManager.alterDataSourceAndRuleConfiguration(event.getDatabaseName(), dataSourcePropertiesMap, ruleConfigs);
-        disableDataSources();
-    }
-    
-    /**
-     * Renew properties.
-     *
-     * @param event properties changed event
-     */
-    @Subscribe
-    public synchronized void renew(final PropertiesChangedEvent event) {
-        contextManager.alterProperties(event.getProps());
-    }
-    
-    /**
-     * Renew to persist ShardingSphere database data.
-     *
-     * @param event database data added event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseDataAddedEvent event) {
-        contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
-    }
-    
-    /**
-     * Renew to delete ShardingSphere data database.
-     *
-     * @param event database delete event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseDataDeletedEvent event) {
-        contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
-    }
-    
-    /**
-     * Renew to added ShardingSphere data schema.
-     *
-     * @param event schema added event
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaDataAddedEvent event) {
-        contextManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
-    }
-    
-    /**
-     * Renew to delete ShardingSphere data schema.
-     *
-     * @param event schema delete event
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaDataDeletedEvent event) {
-        contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
-    }
-    
-    /**
-     * Renew ShardingSphere data of the table.
-     *
-     * @param event table data changed event
-     */
-    @Subscribe
-    public synchronized void renew(final TableDataChangedEvent event) {
-        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
-        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
-    }
-    
-    /**
-     * Trigger show process list.
-     *
-     * @param event show process list trigger event
-     */
-    @Subscribe
-    public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
-        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
-            return;
-        }
-        Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
-        if (!processContexts.isEmpty()) {
-            registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
-                    YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
-        }
-        registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
-    }
-    
-    /**
-     * Trigger show process list.
-     *
-     * @param event show process list trigger event
-     * @throws SQLException SQL exception
-     */
-    @Subscribe
-    public synchronized void killProcessListId(final KillProcessListIdEvent event) throws SQLException {
-        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
-            return;
-        }
-        Collection<Statement> statements = ShowProcessListManager.getInstance().getProcessStatement(event.getProcessListId());
-        for (Statement statement : statements) {
-            statement.cancel();
-        }
-        registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
-    }
-    
-    /**
-     * Complete unit show process list.
-     *
-     * @param event show process list unit complete event
-     */
-    @Subscribe
-    public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
-        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
-        if (null != simpleLock) {
-            simpleLock.doNotify();
-        }
-    }
-    
-    /**
-     * Complete unit kill process list id.
-     *
-     * @param event kill process list id unit complete event
-     */
-    @Subscribe
-    public synchronized void completeUnitKillProcessListId(final KillProcessListIdUnitCompleteEvent event) {
-        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
-        if (null != simpleLock) {
-            simpleLock.doNotify();
-        }
-    }
-    
-    private void disableDataSources() {
-        contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, value) -> value.getRuleMetaData().getRules().forEach(each -> {
-            if (each instanceof StaticDataSourceContainedRule) {
-                disableDataSources((StaticDataSourceContainedRule) each);
-            }
-        }));
-    }
-    
-    private void disableDataSources(final StaticDataSourceContainedRule rule) {
-        Map<String, StorageNodeDataSource> storageNodes = registryCenter.getStorageNodeStatusService().loadStorageNodes();
-        Map<String, StorageNodeDataSource> disableDataSources = storageNodes.entrySet().stream().filter(entry -> StorageNodeStatus.isDisable(entry.getValue().getStatus()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        disableDataSources.forEach((key, value) -> rule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
-    }
-}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
new file mode 100644
index 00000000000..b02dcea8072
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Configuration changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ConfigurationChangedSubscriber {
+    
+    private final MetaDataPersistService persistService;
+    
+    private final RegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public ConfigurationChangedSubscriber(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManager contextManager) {
+        this.persistService = persistService;
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+        disableDataSources();
+    }
+    
+    /**
+     * Renew data source configuration.
+     *
+     * @param event data source changed event.
+     */
+    @Subscribe
+    public synchronized void renew(final DataSourceChangedEvent event) {
+        if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
+            contextManager.alterDataSourceConfiguration(event.getDatabaseName(), event.getDataSourcePropertiesMap());
+            disableDataSources();
+        }
+    }
+    
+    /**
+     * Renew rule configurations.
+     *
+     * @param event rule configurations changed event
+     */
+    @Subscribe
+    public synchronized void renew(final RuleConfigurationsChangedEvent event) {
+        if (persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(), event.getDatabaseVersion())) {
+            contextManager.alterRuleConfiguration(event.getDatabaseName(), event.getRuleConfigurations());
+            disableDataSources();
+        }
+    }
+    
+    /**
+     * Renew global rule configurations.
+     *
+     * @param event global rule configurations changed event
+     */
+    @Subscribe
+    public synchronized void renew(final GlobalRuleConfigurationsChangedEvent event) {
+        contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
+        disableDataSources();
+    }
+    
+    /**
+     * Renew with new database version.
+     *
+     * @param event database version changed event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseVersionChangedEvent event) {
+        Map<String, DataSourceProperties> dataSourcePropertiesMap = persistService.getDataSourceService().load(event.getDatabaseName(), event.getActiveVersion());
+        Collection<RuleConfiguration> ruleConfigs = persistService.getDatabaseRulePersistService().load(event.getDatabaseName(), event.getActiveVersion());
+        contextManager.alterDataSourceAndRuleConfiguration(event.getDatabaseName(), dataSourcePropertiesMap, ruleConfigs);
+        disableDataSources();
+    }
+    
+    /**
+     * Renew properties.
+     *
+     * @param event properties changed event
+     */
+    @Subscribe
+    public synchronized void renew(final PropertiesChangedEvent event) {
+        contextManager.alterProperties(event.getProps());
+    }
+    
+    private void disableDataSources() {
+        contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, value) -> value.getRuleMetaData().getRules().forEach(each -> {
+            if (each instanceof StaticDataSourceContainedRule) {
+                disableDataSources((StaticDataSourceContainedRule) each);
+            }
+        }));
+    }
+    
+    private void disableDataSources(final StaticDataSourceContainedRule rule) {
+        Map<String, StorageNodeDataSource> storageNodes = registryCenter.getStorageNodeStatusService().loadStorageNodes();
+        Map<String, StorageNodeDataSource> disableDataSources = storageNodes.entrySet().stream().filter(entry -> StorageNodeStatus.isDisable(entry.getValue().getStatus()))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        disableDataSources.forEach((key, value) -> rule.updateStatus(new StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
new file mode 100644
index 00000000000..783d4301ca5
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+
+/**
+ * Context manager subscriber facade.
+ */
+public final class ContextManagerSubscriberFacade {
+    
+    public ContextManagerSubscriberFacade(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManager contextManager) {
+        new ConfigurationChangedSubscriber(persistService, registryCenter, contextManager);
+        new ResourceMetaDataChangedSubscriber(contextManager);
+        new DatabaseChangedSubscriber(contextManager);
+        new StateChangedSubscriber(registryCenter, contextManager);
+        new ProcessListChangedSubscriber(registryCenter, contextManager);
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
new file mode 100644
index 00000000000..8e89618a5f8
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
+
+/**
+ * Database changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class DatabaseChangedSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    public DatabaseChangedSubscriber(final ContextManager contextManager) {
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew to persist ShardingSphere database data.
+     *
+     * @param event database data added event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDataAddedEvent event) {
+        contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to delete ShardingSphere data database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDataDeletedEvent event) {
+        contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to added ShardingSphere data schema.
+     *
+     * @param event schema added event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDataAddedEvent event) {
+        contextManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+    }
+    
+    /**
+     * Renew to delete ShardingSphere data schema.
+     *
+     * @param event schema delete event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDataDeletedEvent event) {
+        contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+    }
+    
+    /**
+     * Renew ShardingSphere data of the table.
+     *
+     * @param event table data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final TableDataChangedEvent event) {
+        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
+        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
new file mode 100644
index 00000000000..1e1c774b078
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
+import org.apache.shardingsphere.mode.process.node.ProcessNode;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+/**
+ * Process list changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ProcessListChangedSubscriber {
+    
+    private final RegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Trigger show process list.
+     *
+     * @param event show process list trigger event
+     */
+    @Subscribe
+    public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
+        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+            return;
+        }
+        Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
+        if (!processContexts.isEmpty()) {
+            registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
+                    YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
+        }
+        registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
+    }
+    
+    /**
+     * Trigger show process list.
+     *
+     * @param event show process list trigger event
+     * @throws SQLException SQL exception
+     */
+    @Subscribe
+    public synchronized void killProcessListId(final KillProcessListIdEvent event) throws SQLException {
+        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
+            return;
+        }
+        Collection<Statement> statements = ShowProcessListManager.getInstance().getProcessStatement(event.getProcessListId());
+        for (Statement statement : statements) {
+            statement.cancel();
+        }
+        registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
+    }
+    
+    /**
+     * Complete unit show process list.
+     *
+     * @param event show process list unit complete event
+     */
+    @Subscribe
+    public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
+        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
+        if (null != simpleLock) {
+            simpleLock.doNotify();
+        }
+    }
+    
+    /**
+     * Complete unit kill process list id.
+     *
+     * @param event kill process list id unit complete event
+     */
+    @Subscribe
+    public synchronized void completeUnitKillProcessListId(final KillProcessListIdUnitCompleteEvent event) {
+        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
+        if (null != simpleLock) {
+            simpleLock.doNotify();
+        }
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
new file mode 100644
index 00000000000..340cb6ba862
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+
+/**
+ * Resource meta data changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ResourceMetaDataChangedSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    public ResourceMetaDataChangedSubscriber(final ContextManager contextManager) {
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew to persist meta data.
+     *
+     * @param event database added event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseAddedEvent event) {
+        contextManager.addDatabase(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to delete database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDeletedEvent event) {
+        contextManager.dropDatabase(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to added schema.
+     *
+     * @param event schema added event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaAddedEvent event) {
+        contextManager.addSchema(event.getDatabaseName(), event.getSchemaName());
+    }
+    
+    /**
+     * Renew to delete schema.
+     *
+     * @param event schema delete event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDeletedEvent event) {
+        contextManager.dropSchema(event.getDatabaseName(), event.getSchemaName());
+    }
+    
+    /**
+     * Renew meta data of the table.
+     *
+     * @param event table meta data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final TableMetaDataChangedEvent event) {
+        contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableMetaData(), null);
+        contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable(), null);
+    }
+    
+    /**
+     * Renew meta data of the view.
+     *
+     * @param event view meta data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final ViewMetaDataChangedEvent event) {
+        contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), null, event.getChangedViewMetaData());
+        contextManager.alterSchema(event.getDatabaseName(), event.getSchemaName(), null, event.getDeletedView());
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
new file mode 100644
index 00000000000..794e189ff51
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+
+import java.util.Optional;
+
+/**
+ * State changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class StateChangedSubscriber {
+    
+    private final RegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public StateChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew disabled data source names.
+     *
+     * @param event Storage node changed event
+     */
+    @Subscribe
+    public synchronized void renew(final StorageNodeChangedEvent event) {
+        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+            return;
+        }
+        Optional<ShardingSphereRule> dynamicDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+                .getRules().stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();
+        if (dynamicDataSourceRule.isPresent()) {
+            ((DynamicDataSourceContainedRule) dynamicDataSourceRule.get()).updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+            return;
+        }
+        Optional<ShardingSphereRule> staticDataSourceRule = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+                .getRules().stream().filter(each -> each instanceof StaticDataSourceContainedRule).findFirst();
+        staticDataSourceRule.ifPresent(optional -> ((StaticDataSourceContainedRule) optional)
+                .updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+        DataSourceStateManager.getInstance().updateState(
+                qualifiedDatabase.getDatabaseName(), qualifiedDatabase.getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
+    }
+    
+    /**
+     * Renew primary data source names.
+     *
+     * @param event primary state changed event
+     */
+    @Subscribe
+    public synchronized void renew(final PrimaryStateChangedEvent event) {
+        if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
+            return;
+        }
+        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+        contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
+                .stream()
+                .filter(each -> each instanceof DynamicDataSourceContainedRule)
+                .forEach(each -> ((DynamicDataSourceContainedRule) each)
+                        .restartHeartBeatJob(new PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+    }
+    
+    /**
+     * Renew instance status.
+     *
+     * @param event state event
+     */
+    @Subscribe
+    public synchronized void renew(final StateEvent event) {
+        contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
+    }
+    
+    /**
+     * Renew instance labels.
+     * 
+     * @param event label event
+     */
+    @Subscribe
+    public synchronized void renew(final LabelsEvent event) {
+        // TODO labels may be empty
+        contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
+    }
+    
+    /**
+     * Renew instance list.
+     *
+     * @param event compute node online event
+     */
+    @Subscribe
+    public synchronized void renew(final InstanceOnlineEvent event) {
+        contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+    }
+    
+    /**
+     * Renew instance list.
+     *
+     * @param event compute node offline event
+     */
+    @Subscribe
+    public synchronized void renew(final InstanceOfflineEvent event) {
+        contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
+    }
+}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
similarity index 57%
rename from mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
rename to mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
index a87baf3db0b..a9df46a4e70 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
@@ -27,48 +27,27 @@ import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
-import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
-import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.sqltranslator.rule.SQLTranslatorRule;
@@ -78,7 +57,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -89,26 +67,20 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class ContextManagerCoordinatorTest {
+public final class ConfigurationChangedSubscriberTest {
     
-    private ContextManagerCoordinator coordinator;
+    private ConfigurationChangedSubscriber subscriber;
     
     private ContextManager contextManager;
     
@@ -126,7 +98,7 @@ public final class ContextManagerCoordinatorTest {
         contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
         contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
                 contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
-        coordinator = new ContextManagerCoordinator(persistService, new RegistryCenter(mock(ClusterPersistRepository.class),
+        subscriber = new ConfigurationChangedSubscriber(persistService, new RegistryCenter(mock(ClusterPersistRepository.class),
                 new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
     }
     
@@ -155,25 +127,14 @@ public final class ContextManagerCoordinatorTest {
     public void assertRenewForRuleConfigurationsChanged() {
         when(persistService.getMetaDataVersionPersistService().isActiveVersion("db", "0")).thenReturn(true);
         assertThat(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"), is(database));
-        coordinator.renew(new RuleConfigurationsChangedEvent("db", "0", Collections.emptyList()));
+        subscriber.renew(new RuleConfigurationsChangedEvent("db", "0", Collections.emptyList()));
         assertThat(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"), not(database));
     }
     
-    @Test
-    public void assertRenewForDisableStateChanged() {
-        StaticDataSourceContainedRule staticDataSourceRule = mock(StaticDataSourceContainedRule.class);
-        when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
-        StorageNodeChangedEvent event = new StorageNodeChangedEvent(new QualifiedDatabase("db.readwrite_ds.ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
-        coordinator.renew(event);
-        verify(staticDataSourceRule).updateStatus(argThat(
-                (ArgumentMatcher<StorageNodeDataSourceChangedEvent>) argumentEvent -> Objects.equals(event.getQualifiedDatabase(), argumentEvent.getQualifiedDatabase())
-                        && Objects.equals(event.getDataSource(), argumentEvent.getDataSource())));
-    }
-    
     @Test
     public void assertRenewForDataSourceChanged() {
         when(persistService.getMetaDataVersionPersistService().isActiveVersion("db", "0")).thenReturn(true);
-        coordinator.renew(new DataSourceChangedEvent("db", "0", createChangedDataSourcePropertiesMap()));
+        subscriber.renew(new DataSourceChangedEvent("db", "0", createChangedDataSourcePropertiesMap()));
         assertTrue(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getDataSources().containsKey("ds_2"));
     }
     
@@ -189,7 +150,7 @@ public final class ContextManagerCoordinatorTest {
     @Test
     public void assertRenewForGlobalRuleConfigurationsChanged() {
         GlobalRuleConfigurationsChangedEvent event = new GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
-        coordinator.renew(event);
+        subscriber.renew(event);
         assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), not(globalRuleMetaData));
         assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().size(), is(3));
         assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().stream().filter(each -> each instanceof AuthorityRule).count(), is(1L));
@@ -209,51 +170,12 @@ public final class ContextManagerCoordinatorTest {
         return result;
     }
     
-    @Test
-    public void assertRenewPrimaryDataSourceName() {
-        Collection<ShardingSphereRule> rules = new LinkedList<>();
-        DynamicDataSourceContainedRule dynamicDataSourceRule = mock(DynamicDataSourceContainedRule.class);
-        rules.add(dynamicDataSourceRule);
-        ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
-        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
-        when(database.getRuleMetaData()).thenReturn(ruleMetaData);
-        contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database);
-        PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
-        coordinator.renew(mockPrimaryStateChangedEvent);
-        verify(dynamicDataSourceRule).restartHeartBeatJob(any());
-    }
-    
-    @Test
-    public void assertRenewInstanceStatus() {
-        Collection<String> testStates = new LinkedList<>();
-        testStates.add(StateType.OK.name());
-        StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), testStates);
-        coordinator.renew(mockStateEvent);
-        assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.OK));
-        testStates.add(StateType.CIRCUIT_BREAK.name());
-        coordinator.renew(mockStateEvent);
-        assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
-    }
-    
-    @Test
-    public void assertRenewInstanceLabels() {
-        Collection<String> labels = Collections.singleton("test");
-        coordinator.renew(new LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), labels));
-        assertThat(contextManager.getInstanceContext().getInstance().getLabels(), is(labels));
-    }
-    
-    @Test
-    public void assertRenewInstanceOfflineEvent() {
-        coordinator.renew(new InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
-        assertThat(((ProxyInstanceMetaData) contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), is(3307));
-    }
-    
     @Test
     public void assertRenewDatabaseVersionChangedEvent() {
         when(persistService.getDataSourceService().load("db", "1")).thenReturn(getVersionChangedDataSourcePropertiesMap());
         when(persistService.getDatabaseRulePersistService().load("db", "1")).thenReturn(Collections.emptyList());
         Map<String, DataSource> dataSourceMap = initContextManager();
-        coordinator.renew(new DatabaseVersionChangedEvent("db", "1"));
+        subscriber.renew(new DatabaseVersionChangedEvent("db", "1"));
         assertThat(contextManager.getDataSourceMap("db").get("ds_0"), is(dataSourceMap.get("ds_0")));
         assertNotNull(contextManager.getDataSourceMap("db").get("ds_1"));
         assertThat(DataSourcePropertiesCreator.create(getChangedDataSource()), is(DataSourcePropertiesCreator.create(contextManager.getDataSourceMap("db").get("ds_1"))));
@@ -261,73 +183,14 @@ public final class ContextManagerCoordinatorTest {
         assertThat(DataSourcePropertiesCreator.create(new MockedDataSource()), is(DataSourcePropertiesCreator.create(contextManager.getDataSourceMap("db").get("primary_ds"))));
     }
     
-    @Test
-    public void assertRenewInstanceOnlineEvent() {
-        InstanceMetaData instanceMetaData1 = new ProxyInstanceMetaData("foo_instance_3307", 3307);
-        InstanceOnlineEvent instanceOnlineEvent1 = new InstanceOnlineEvent(instanceMetaData1);
-        coordinator.renew(instanceOnlineEvent1);
-        assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(1));
-        assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(), is(instanceMetaData1));
-        InstanceMetaData instanceMetaData2 = new ProxyInstanceMetaData("foo_instance_3308", 3308);
-        InstanceOnlineEvent instanceOnlineEvent2 = new InstanceOnlineEvent(instanceMetaData2);
-        coordinator.renew(instanceOnlineEvent2);
-        assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
-        assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData2));
-        coordinator.renew(instanceOnlineEvent1);
-        assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
-        assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData1));
-    }
-    
     @Test
     public void assertRenewProperties() {
         Properties props = new Properties();
         props.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(), Boolean.TRUE.toString());
-        coordinator.renew(new PropertiesChangedEvent(props));
+        subscriber.renew(new PropertiesChangedEvent(props));
         assertThat(contextManager.getMetaDataContexts().getMetaData().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()), is(Boolean.TRUE.toString()));
     }
     
-    @Test
-    public void assertCompleteUnitShowProcessList() {
-        String processListId = "foo_process_id";
-        ShowProcessListSimpleLock lock = new ShowProcessListSimpleLock();
-        ShowProcessListManager.getInstance().getLocks().put(processListId, lock);
-        long startTime = System.currentTimeMillis();
-        ExecutorService executorService = Executors.newFixedThreadPool(1);
-        executorService.submit(() -> {
-            try {
-                Thread.sleep(50L);
-            } catch (final InterruptedException ignored) {
-            }
-            coordinator.completeUnitShowProcessList(new ShowProcessListUnitCompleteEvent(processListId));
-        });
-        lockAndAwaitDefaultTime(lock);
-        long currentTime = System.currentTimeMillis();
-        assertTrue(currentTime >= startTime + 50L);
-        assertTrue(currentTime <= startTime + 5000L);
-        ShowProcessListManager.getInstance().getLocks().remove(processListId);
-    }
-    
-    @Test
-    public void assertTriggerShowProcessList() throws NoSuchFieldException, IllegalAccessException {
-        String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
-        ShowProcessListManager.getInstance().putProcessContext("foo_execution_id", mock(ExecuteProcessContext.class));
-        String processListId = "foo_process_id";
-        coordinator.triggerShowProcessList(new ShowProcessListTriggerEvent(instanceId, processListId));
-        ClusterPersistRepository repository = ReflectionUtil.getFieldValue(coordinator, "registryCenter", RegistryCenter.class).getRepository();
-        verify(repository).persist("/execution_nodes/foo_process_id/" + instanceId,
-                "contexts:" + System.lineSeparator() + "- startTimeMillis: 0" + System.lineSeparator());
-        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_process_id");
-    }
-    
-    private void lockAndAwaitDefaultTime(final ShowProcessListSimpleLock lock) {
-        lock.lock();
-        try {
-            lock.awaitDefaultTime();
-        } finally {
-            lock.unlock();
-        }
-    }
-    
     private Map<String, DataSource> initContextManager() {
         Map<String, DataSource> result = getDataSourceMap();
         ShardingSphereResourceMetaData resourceMetaData = new ShardingSphereResourceMetaData("sharding_db", result);
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
new file mode 100644
index 00000000000..b7cab8eddf6
--- /dev/null
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ProcessListChangedSubscriberTest {
+    
+    private ProcessListChangedSubscriber subscriber;
+    
+    private ContextManager contextManager;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() throws SQLException {
+        contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+        contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
+                contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
+        subscriber = new ProcessListChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
+    }
+    
+    private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
+        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+        InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+        return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyList(), new Properties(), Collections.emptyList(), instanceMetaData, false);
+    }
+    
+    private Map<String, ShardingSphereDatabase> createDatabases() {
+        when(database.getResourceMetaData().getDataSources()).thenReturn(new LinkedHashMap<>());
+        when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
+        when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
+        when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+        when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("db", database);
+        return result;
+    }
+    
+    @Test
+    public void assertCompleteUnitShowProcessList() {
+        String processListId = "foo_process_id";
+        ShowProcessListSimpleLock lock = new ShowProcessListSimpleLock();
+        ShowProcessListManager.getInstance().getLocks().put(processListId, lock);
+        long startTime = System.currentTimeMillis();
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+        executorService.submit(() -> {
+            try {
+                Thread.sleep(50L);
+            } catch (final InterruptedException ignored) {
+            }
+            subscriber.completeUnitShowProcessList(new ShowProcessListUnitCompleteEvent(processListId));
+        });
+        lockAndAwaitDefaultTime(lock);
+        long currentTime = System.currentTimeMillis();
+        assertTrue(currentTime >= startTime + 50L);
+        assertTrue(currentTime <= startTime + 5000L);
+        ShowProcessListManager.getInstance().getLocks().remove(processListId);
+    }
+    
+    @Test
+    public void assertTriggerShowProcessList() throws NoSuchFieldException, IllegalAccessException {
+        String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
+        ShowProcessListManager.getInstance().putProcessContext("foo_execution_id", mock(ExecuteProcessContext.class));
+        String processListId = "foo_process_id";
+        subscriber.triggerShowProcessList(new ShowProcessListTriggerEvent(instanceId, processListId));
+        ClusterPersistRepository repository = ReflectionUtil.getFieldValue(subscriber, "registryCenter", RegistryCenter.class).getRepository();
+        verify(repository).persist("/execution_nodes/foo_process_id/" + instanceId,
+                "contexts:" + System.lineSeparator() + "- startTimeMillis: 0" + System.lineSeparator());
+        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_process_id");
+    }
+    
+    private void lockAndAwaitDefaultTime(final ShowProcessListSimpleLock lock) {
+        lock.lock();
+        try {
+            lock.awaitDefaultTime();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
new file mode 100644
index 00000000000..43097de6d25
--- /dev/null
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.test.mock.MockedDataSource;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ResourceMetaDataChangedSubscriberTest {
+    
+    private ResourceMetaDataChangedSubscriber subscriber;
+    
+    private ContextManager contextManager;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MetaDataPersistService persistService;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() throws SQLException {
+        contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+        contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
+                contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
+        subscriber = new ResourceMetaDataChangedSubscriber(contextManager);
+    }
+    
+    private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
+        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+        InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+        return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyList(), new Properties(), Collections.emptyList(), instanceMetaData, false);
+    }
+    
+    private Map<String, ShardingSphereDatabase> createDatabases() {
+        when(database.getName()).thenReturn("db");
+        when(database.getResourceMetaData().getDataSources()).thenReturn(new LinkedHashMap<>());
+        when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
+        when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
+        when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+        when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("db", database);
+        return result;
+    }
+    
+    @Test
+    public void assertRenewForDatabaseAdded() {
+        when(persistService.getDataSourceService().load("db_added")).thenReturn(createDataSourcePropertiesMap());
+        when(persistService.getDatabaseRulePersistService().load("db_added")).thenReturn(Collections.emptyList());
+        subscriber.renew(new DatabaseAddedEvent("db_added"));
+        assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db_added").getResourceMetaData().getDataSources());
+    }
+    
+    private Map<String, DataSourceProperties> createDataSourcePropertiesMap() {
+        MockedDataSource dataSource = new MockedDataSource();
+        Map<String, DataSourceProperties> result = new LinkedHashMap<>(3, 1);
+        result.put("primary_ds", DataSourcePropertiesCreator.create(dataSource));
+        result.put("replica_ds_0", DataSourcePropertiesCreator.create(dataSource));
+        result.put("replica_ds_1", DataSourcePropertiesCreator.create(dataSource));
+        return result;
+    }
+    
+    @Test
+    public void assertRenewForDatabaseDeleted() {
+        subscriber.renew(new DatabaseDeletedEvent("db"));
+        assertNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"));
+    }
+    
+    @Test
+    public void assertRenewForSchemaAdded() {
+        subscriber.renew(new SchemaAddedEvent("db", "foo_schema"));
+        verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).putSchema(argThat(argument -> argument.equals("foo_schema")), any(ShardingSphereSchema.class));
+    }
+    
+    @Test
+    public void assertRenewForSchemaDeleted() {
+        when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("foo_schema")).thenReturn(true);
+        subscriber.renew(new SchemaDeletedEvent("db", "foo_schema"));
+        verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).removeSchema("foo_schema");
+    }
+    
+    @Test
+    public void assertRenewForTableMetaDataChangedChanged() {
+        when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("db")).thenReturn(true);
+        ShardingSphereTable changedTableMetaData = new ShardingSphereTable("t_order", Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
+        TableMetaDataChangedEvent event = new TableMetaDataChangedEvent("db", "db", changedTableMetaData, null);
+        subscriber.renew(event);
+        verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).putTable("t_order", event.getChangedTableMetaData());
+    }
+    
+    @Test
+    public void assertRenewForViewMetaDataChanged() {
+        when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("db")).thenReturn(true);
+        ShardingSphereView changedViewMetaData = new ShardingSphereView("t_order_view", "");
+        ViewMetaDataChangedEvent event = new ViewMetaDataChangedEvent("db", "db", changedViewMetaData, null);
+        subscriber.renew(event);
+        verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).putView("t_order_view", event.getChangedViewMetaData());
+    }
+}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
new file mode 100644
index 00000000000..ba5f3eecbb8
--- /dev/null
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.infra.state.StateType;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+import org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class StateChangedSubscriberTest {
+    
+    private StateChangedSubscriber subscriber;
+    
+    private ContextManager contextManager;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() throws SQLException {
+        contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+        contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
+                contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
+        subscriber = new StateChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class),
+                new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
+    }
+    
+    private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
+        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+        InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+        return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyList(), new Properties(), Collections.emptyList(), instanceMetaData, false);
+    }
+    
+    private Map<String, ShardingSphereDatabase> createDatabases() {
+        when(database.getResourceMetaData().getDataSources()).thenReturn(new LinkedHashMap<>());
+        when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
+        when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
+        when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+        when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("db", database);
+        return result;
+    }
+    
+    @Test
+    public void assertRenewForDisableStateChanged() {
+        StaticDataSourceContainedRule staticDataSourceRule = mock(StaticDataSourceContainedRule.class);
+        when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
+        StorageNodeChangedEvent event = new StorageNodeChangedEvent(new QualifiedDatabase("db.readwrite_ds.ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
+        subscriber.renew(event);
+        verify(staticDataSourceRule).updateStatus(argThat(
+                (ArgumentMatcher<StorageNodeDataSourceChangedEvent>) argumentEvent -> Objects.equals(event.getQualifiedDatabase(), argumentEvent.getQualifiedDatabase())
+                        && Objects.equals(event.getDataSource(), argumentEvent.getDataSource())));
+    }
+    
+    @Test
+    public void assertRenewPrimaryDataSourceName() {
+        Collection<ShardingSphereRule> rules = new LinkedList<>();
+        DynamicDataSourceContainedRule dynamicDataSourceRule = mock(DynamicDataSourceContainedRule.class);
+        rules.add(dynamicDataSourceRule);
+        ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+        when(database.getRuleMetaData()).thenReturn(ruleMetaData);
+        contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database);
+        PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
+        subscriber.renew(mockPrimaryStateChangedEvent);
+        verify(dynamicDataSourceRule).restartHeartBeatJob(any());
+    }
+    
+    @Test
+    public void assertRenewInstanceStatus() {
+        Collection<String> testStates = new LinkedList<>();
+        testStates.add(StateType.OK.name());
+        StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), testStates);
+        subscriber.renew(mockStateEvent);
+        assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.OK));
+        testStates.add(StateType.CIRCUIT_BREAK.name());
+        subscriber.renew(mockStateEvent);
+        assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
+    }
+    
+    @Test
+    public void assertRenewInstanceLabels() {
+        Collection<String> labels = Collections.singleton("test");
+        subscriber.renew(new LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), labels));
+        assertThat(contextManager.getInstanceContext().getInstance().getLabels(), is(labels));
+    }
+    
+    @Test
+    public void assertRenewInstanceOfflineEvent() {
+        subscriber.renew(new InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
+        assertThat(((ProxyInstanceMetaData) contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), is(3307));
+    }
+    
+    @Test
+    public void assertRenewInstanceOnlineEvent() {
+        InstanceMetaData instanceMetaData1 = new ProxyInstanceMetaData("foo_instance_3307", 3307);
+        InstanceOnlineEvent instanceOnlineEvent1 = new InstanceOnlineEvent(instanceMetaData1);
+        subscriber.renew(instanceOnlineEvent1);
+        assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(1));
+        assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(), is(instanceMetaData1));
+        InstanceMetaData instanceMetaData2 = new ProxyInstanceMetaData("foo_instance_3308", 3308);
+        InstanceOnlineEvent instanceOnlineEvent2 = new InstanceOnlineEvent(instanceMetaData2);
+        subscriber.renew(instanceOnlineEvent2);
+        assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
+        assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData2));
+        subscriber.renew(instanceOnlineEvent1);
+        assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
+        assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData1));
+    }
+}