You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2023/06/01 03:32:33 UTC

[shardingsphere] branch master updated: Add subscribe to refresh readwrite-splitting configuration at readwrite-splitting module (#25964)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 884d6ab10c9 Add subscribe to refresh readwrite-splitting configuration at  readwrite-splitting module (#25964)
884d6ab10c9 is described below

commit 884d6ab10c9a1399b38e81f2ab957a1ffb9cf96d
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Thu Jun 1 11:32:25 2023 +0800

    Add subscribe to refresh readwrite-splitting configuration at  readwrite-splitting module (#25964)
    
    * Add subscribe to refresh readwrite-splitting configuration at  readwrite-splitting module
    
    * Remove unless code
---
 .../ReadwriteSplittingConfigurationSubscriber.java | 106 +++++++++++++++++++++
 .../ReadwriteSplittingLoadBalanceSubscriber.java   |  96 +++++++++++++++++++
 ...nfra.rule.RuleConfigurationSubscribeCoordinator |  19 ++++
 .../RuleConfigurationSubscribeCoordinator.java     |  34 +++++++
 .../ReadwriteSplittingNodeConverter.java           |   6 --
 .../mode/event/rule/FeatureEvent.java              |  24 +++++
 .../AddReadwriteSplittingConfigurationEvent.java   |  34 +++++++
 .../AlterReadwriteSplittingConfigurationEvent.java |  36 +++++++
 ...DeleteReadwriteSplittingConfigurationEvent.java |  34 +++++++
 .../loadbalance/AddLoadBalanceEvent.java           |  36 +++++++
 .../loadbalance/AlterLoadBalanceEvent.java         |  36 +++++++
 .../loadbalance/DeleteLoadBalanceEvent.java        |  34 +++++++
 .../cluster/NewClusterContextManagerBuilder.java   |  16 +++-
 .../coordinator/registry/NewGovernanceWatcher.java |  58 +++++++++++
 .../watcher/NewMetaDataChangedWatcher.java         |  52 ++++++++++
 ...uster.coordinator.registry.NewGovernanceWatcher |  18 ++++
 16 files changed, 628 insertions(+), 11 deletions(-)

diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingConfigurationSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingConfigurationSubscriber.java
new file mode 100644
index 00000000000..6f7ed5e46d2
--- /dev/null
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingConfigurationSubscriber.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.readwritesplitting.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration.AddReadwriteSplittingConfigurationEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration.AlterReadwriteSplittingConfigurationEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration.DeleteReadwriteSplittingConfigurationEvent;
+import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Map;
+import java.util.LinkedList;
+
+/**
+ * Readwrite-splitting configuration subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@RequiredArgsConstructor
+public final class ReadwriteSplittingConfigurationSubscriber implements RuleConfigurationSubscribeCoordinator {
+    
+    private Map<String, ShardingSphereDatabase> databases;
+    
+    @Override
+    public void registerRuleConfigurationSubscriber(final Map<String, ShardingSphereDatabase> databases, final InstanceContext instanceContext) {
+        this.databases = databases;
+        instanceContext.getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew with add readwrite-splitting configuration.
+     *
+     * @param event add readwrite-splitting configuration event
+     */
+    @Subscribe
+    public synchronized void renew(final AddReadwriteSplittingConfigurationEvent event) {
+        ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+        ReadwriteSplittingDataSourceRuleConfiguration needToAddedConfig = YamlEngine.unmarshal(event.getData(), ReadwriteSplittingDataSourceRuleConfiguration.class);
+        Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+        Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
+        if (rule.isPresent()) {
+            ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration();
+            config.getDataSources().add(needToAddedConfig);
+            ruleConfigs.add(config);
+        } else {
+            ruleConfigs.add(new ReadwriteSplittingRuleConfiguration(Collections.singletonList(needToAddedConfig), Collections.emptyMap()));
+        }
+        database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+    }
+    
+    /**
+     * Renew with alter readwrite-splitting configuration.
+     *
+     * @param event alter readwrite-splitting configuration event
+     */
+    @Subscribe
+    public synchronized void renew(final AlterReadwriteSplittingConfigurationEvent event) {
+        ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+        ReadwriteSplittingDataSourceRuleConfiguration needToAlteredConfig = YamlEngine.unmarshal(event.getData(), ReadwriteSplittingDataSourceRuleConfiguration.class);
+        Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+        ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+        config.getDataSources().removeIf(each -> each.getName().equals(event.getGroupName()));
+        ruleConfigs.add(new ReadwriteSplittingRuleConfiguration(Collections.singletonList(needToAlteredConfig), Collections.emptyMap()));
+        database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+    }
+    
+    /**
+     * Renew with delete readwrite-splitting configuration.
+     *
+     * @param event delete readwrite-splitting configuration event
+     */
+    @Subscribe
+    public synchronized void renew(final DeleteReadwriteSplittingConfigurationEvent event) {
+        ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+        Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+        ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+        config.getDataSources().removeIf(each -> each.getName().equals(event.getGroupName()));
+        ruleConfigs.add(config);
+        database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+    }
+}
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
new file mode 100644
index 00000000000..a9007d46e05
--- /dev/null
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.readwritesplitting.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance.AddLoadBalanceEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance.AlterLoadBalanceEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance.DeleteLoadBalanceEvent;
+import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Readwrite-splitting load-balance subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@RequiredArgsConstructor
+public final class ReadwriteSplittingLoadBalanceSubscriber implements RuleConfigurationSubscribeCoordinator {
+    
+    private Map<String, ShardingSphereDatabase> databases;
+    
+    @Override
+    public void registerRuleConfigurationSubscriber(final Map<String, ShardingSphereDatabase> databases, final InstanceContext instanceContext) {
+        this.databases = databases;
+        instanceContext.getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew with add load-balance.
+     *
+     * @param event add load-balance event
+     */
+    @Subscribe
+    public synchronized void renew(final AddLoadBalanceEvent event) {
+        renew(event.getDatabaseName(), event.getLoadBalanceName(), event.getData());
+    }
+    
+    /**
+     * Renew with alter load-balance.
+     *
+     * @param event alter load-balance event
+     */
+    @Subscribe
+    public synchronized void renew(final AlterLoadBalanceEvent event) {
+        renew(event.getDatabaseName(), event.getLoadBalanceName(), event.getData());
+    }
+    
+    private void renew(final String databaseName, final String loadBalanceName, final String data) {
+        ShardingSphereDatabase database = databases.get(databaseName);
+        Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+        ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+        config.getLoadBalancers().put(loadBalanceName, YamlEngine.unmarshal(data, AlgorithmConfiguration.class));
+        ruleConfigs.add(config);
+        database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+    }
+    
+    /**
+     * Renew with delete load-balance.
+     *
+     * @param event delete load-balance event
+     */
+    @Subscribe
+    public synchronized void renew(final DeleteLoadBalanceEvent event) {
+        ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+        Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+        ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+        config.getLoadBalancers().remove(event.getLoadBalanceName());
+        ruleConfigs.add(config);
+        database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+    }
+}
diff --git a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator
new file mode 100644
index 00000000000..9e8adc24813
--- /dev/null
+++ b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.readwritesplitting.subscriber.ReadwriteSplittingConfigurationSubscriber
+org.apache.shardingsphere.readwritesplitting.subscriber.ReadwriteSplittingLoadBalanceSubscriber
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/RuleConfigurationSubscribeCoordinator.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/RuleConfigurationSubscribeCoordinator.java
new file mode 100644
index 00000000000..2079f40ef44
--- /dev/null
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/RuleConfigurationSubscribeCoordinator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule;
+
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+
+import java.util.Map;
+
+public interface RuleConfigurationSubscribeCoordinator {
+    
+    /**
+     * Register rule configuration subscriber.
+     *
+     * @param databases databases
+     * @param instanceContext instance context
+     */
+    void registerRuleConfigurationSubscriber(Map<String, ShardingSphereDatabase> databases, InstanceContext instanceContext);
+}
diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java
index 90b01188618..cc08d858b4e 100644
--- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java
+++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java
@@ -22,12 +22,6 @@ package org.apache.shardingsphere.metadata.persist.node.metadata.config.readwrit
  */
 public final class ReadwriteSplittingNodeConverter {
     
-    private static final String ROOT_NODE = "readwrite_splitting";
-    
-    private static final String ACTIVE_VERSION_NODE = "/active_version";
-    
-    private static final String VERSIONS_NODE = "/versions";
-    
     private static final String LOAD_BALANCER_NODE = "load_balancers";
     
     /**
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/FeatureEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/FeatureEvent.java
new file mode 100644
index 00000000000..750d4728213
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/FeatureEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule;
+
+/**
+ * Feature event.
+ */
+public interface FeatureEvent {
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AddReadwriteSplittingConfigurationEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AddReadwriteSplittingConfigurationEvent.java
new file mode 100644
index 00000000000..89f52e7bdf2
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AddReadwriteSplittingConfigurationEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * Add readwrite-splitting configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AddReadwriteSplittingConfigurationEvent implements FeatureEvent {
+    
+    private final String databaseName;
+    
+    private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AlterReadwriteSplittingConfigurationEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AlterReadwriteSplittingConfigurationEvent.java
new file mode 100644
index 00000000000..c901790f048
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AlterReadwriteSplittingConfigurationEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * alter readwrite-splitting configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AlterReadwriteSplittingConfigurationEvent implements FeatureEvent {
+    
+    private final String databaseName;
+    
+    private final String groupName;
+    
+    private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/DeleteReadwriteSplittingConfigurationEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/DeleteReadwriteSplittingConfigurationEvent.java
new file mode 100644
index 00000000000..1cccf4b737f
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/DeleteReadwriteSplittingConfigurationEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * delete readwrite-splitting configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DeleteReadwriteSplittingConfigurationEvent implements FeatureEvent {
+    
+    private final String databaseName;
+    
+    private final String groupName;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AddLoadBalanceEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AddLoadBalanceEvent.java
new file mode 100644
index 00000000000..4f80780ca6d
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AddLoadBalanceEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * Add load-balance event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AddLoadBalanceEvent implements FeatureEvent {
+    
+    private final String databaseName;
+    
+    private final String loadBalanceName;
+    
+    private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AlterLoadBalanceEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AlterLoadBalanceEvent.java
new file mode 100644
index 00000000000..070d41ccc89
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AlterLoadBalanceEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * alter load-balance event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AlterLoadBalanceEvent implements FeatureEvent {
+    
+    private final String databaseName;
+    
+    private final String loadBalanceName;
+    
+    private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/DeleteLoadBalanceEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/DeleteLoadBalanceEvent.java
new file mode 100644
index 00000000000..24c383de5f7
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/DeleteLoadBalanceEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * delete load-balance event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DeleteLoadBalanceEvent implements FeatureEvent {
+    
+    private final String databaseName;
+    
+    private final String loadBalanceName;
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
index 1952636f39f..b59508afda3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
@@ -21,7 +21,9 @@ import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.metadata.persist.NewMetaDataPersistService;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
@@ -31,7 +33,6 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.NewContextManagerSubscriberFacade;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.NewMetaDataContextsFactory;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -40,7 +41,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import java.sql.SQLException;
 
 /**
- * TODO replace the old implementation after meta data refactor completed
+ * TODO Rename ClusterContextManagerBuilder when metadata structure adjustment completed. #25485
  * New cluster context manager builder.
  */
 public final class NewClusterContextManagerBuilder implements ContextManagerBuilder {
@@ -57,7 +58,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
         MetaDataContexts metaDataContexts = NewMetaDataContextsFactory.create(persistService, param, instanceContext, registryCenter.getStorageNodeStatusService().loadStorageNodes());
         ContextManager result = new ContextManager(metaDataContexts, instanceContext);
         setContextManagerAware(result);
-        registerOnline(persistService, registryCenter, param, result);
+        registerOnline(registryCenter, param, result);
         return result;
     }
     
@@ -77,11 +78,11 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
         ((ContextManagerAware) contextManager.getInstanceContext().getModeContextManager()).setContextManagerAware(contextManager);
     }
     
-    private void registerOnline(final NewMetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
+    private void registerOnline(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
         loadClusterStatus(registryCenter, contextManager);
         contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
         contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
-        new NewContextManagerSubscriberFacade(persistService, registryCenter, contextManager);
+        registerRuleConfigurationSubscribers(contextManager.getMetaDataContexts(), contextManager.getInstanceContext());
         registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
     }
     
@@ -90,6 +91,11 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
         contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
     }
     
+    private void registerRuleConfigurationSubscribers(final MetaDataContexts metaDataContexts, final InstanceContext instanceContext) {
+        ShardingSphereServiceLoader.getServiceInstances(RuleConfigurationSubscribeCoordinator.class)
+                .forEach(each -> each.registerRuleConfigurationSubscriber(metaDataContexts.getMetaData().getDatabases(), instanceContext));
+    }
+    
     @Override
     public String getType() {
         return "New_Cluster";
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcher.java
new file mode 100644
index 00000000000..a62ebaeda4a
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.Collection;
+import java.util.Optional;
+
+// TODO Rename GovernanceWatcher when metadata structure adjustment completed. #25485
+/**
+ * Governance watcher.
+ * 
+ * @param <T> type of event
+ */
+@SingletonSPI
+public interface NewGovernanceWatcher<T> {
+    
+    /**
+     * Get watching keys.
+     *
+     * @param databaseName database name
+     * @return watching keys
+     */
+    Collection<String> getWatchingKeys(String databaseName);
+    
+    /**
+     * Get watching types.
+     *
+     * @return watching types
+     */
+    Collection<Type> getWatchingTypes();
+    
+    /**
+     * Create governance event.
+     * 
+     * @param event registry center data changed event
+     * @return governance event
+     */
+    Optional<T> createGovernanceEvent(DataChangedEvent event);
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
new file mode 100644
index 00000000000..3cc485c9a8e
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher;
+
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Arrays;
+
+/**
+ * TODO Rename MetaDataChangedWatcher when metadata structure adjustment completed. #25485
+ * Meta data changed watcher.
+ */
+public final class NewMetaDataChangedWatcher implements NewGovernanceWatcher<GovernanceEvent> {
+    
+    @Override
+    public Collection<String> getWatchingKeys(final String databaseName) {
+        return null == databaseName ? Collections.singleton(DatabaseMetaDataNode.getMetaDataNodePath())
+                : Collections.singleton(DatabaseMetaDataNode.getDatabaseNamePath(databaseName));
+    }
+    
+    @Override
+    public Collection<Type> getWatchingTypes() {
+        return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+    }
+    
+    @Override
+    public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
+        return Optional.empty();
+    }
+}
diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher
new file mode 100644
index 00000000000..3128c6eb3dd
--- /dev/null
+++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.NewMetaDataChangedWatcher