You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2023/06/01 03:32:33 UTC
[shardingsphere] branch master updated: Add subscribe to refresh readwrite-splitting configuration at readwrite-splitting module (#25964)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 884d6ab10c9 Add subscribe to refresh readwrite-splitting configuration at readwrite-splitting module (#25964)
884d6ab10c9 is described below
commit 884d6ab10c9a1399b38e81f2ab957a1ffb9cf96d
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Thu Jun 1 11:32:25 2023 +0800
Add subscribe to refresh readwrite-splitting configuration at readwrite-splitting module (#25964)
* Add subscribe to refresh readwrite-splitting configuration at readwrite-splitting module
* Remove unless code
---
.../ReadwriteSplittingConfigurationSubscriber.java | 106 +++++++++++++++++++++
.../ReadwriteSplittingLoadBalanceSubscriber.java | 96 +++++++++++++++++++
...nfra.rule.RuleConfigurationSubscribeCoordinator | 19 ++++
.../RuleConfigurationSubscribeCoordinator.java | 34 +++++++
.../ReadwriteSplittingNodeConverter.java | 6 --
.../mode/event/rule/FeatureEvent.java | 24 +++++
.../AddReadwriteSplittingConfigurationEvent.java | 34 +++++++
.../AlterReadwriteSplittingConfigurationEvent.java | 36 +++++++
...DeleteReadwriteSplittingConfigurationEvent.java | 34 +++++++
.../loadbalance/AddLoadBalanceEvent.java | 36 +++++++
.../loadbalance/AlterLoadBalanceEvent.java | 36 +++++++
.../loadbalance/DeleteLoadBalanceEvent.java | 34 +++++++
.../cluster/NewClusterContextManagerBuilder.java | 16 +++-
.../coordinator/registry/NewGovernanceWatcher.java | 58 +++++++++++
.../watcher/NewMetaDataChangedWatcher.java | 52 ++++++++++
...uster.coordinator.registry.NewGovernanceWatcher | 18 ++++
16 files changed, 628 insertions(+), 11 deletions(-)
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingConfigurationSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingConfigurationSubscriber.java
new file mode 100644
index 00000000000..6f7ed5e46d2
--- /dev/null
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingConfigurationSubscriber.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.readwritesplitting.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration.AddReadwriteSplittingConfigurationEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration.AlterReadwriteSplittingConfigurationEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration.DeleteReadwriteSplittingConfigurationEvent;
+import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Map;
+import java.util.LinkedList;
+
+/**
+ * Readwrite-splitting configuration subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@RequiredArgsConstructor
+public final class ReadwriteSplittingConfigurationSubscriber implements RuleConfigurationSubscribeCoordinator {
+
+ private Map<String, ShardingSphereDatabase> databases;
+
+ @Override
+ public void registerRuleConfigurationSubscriber(final Map<String, ShardingSphereDatabase> databases, final InstanceContext instanceContext) {
+ this.databases = databases;
+ instanceContext.getEventBusContext().register(this);
+ }
+
+ /**
+ * Renew with add readwrite-splitting configuration.
+ *
+ * @param event add readwrite-splitting configuration event
+ */
+ @Subscribe
+ public synchronized void renew(final AddReadwriteSplittingConfigurationEvent event) {
+ ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+ ReadwriteSplittingDataSourceRuleConfiguration needToAddedConfig = YamlEngine.unmarshal(event.getData(), ReadwriteSplittingDataSourceRuleConfiguration.class);
+ Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+ Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
+ if (rule.isPresent()) {
+ ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration();
+ config.getDataSources().add(needToAddedConfig);
+ ruleConfigs.add(config);
+ } else {
+ ruleConfigs.add(new ReadwriteSplittingRuleConfiguration(Collections.singletonList(needToAddedConfig), Collections.emptyMap()));
+ }
+ database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+ }
+
+ /**
+ * Renew with alter readwrite-splitting configuration.
+ *
+ * @param event alter readwrite-splitting configuration event
+ */
+ @Subscribe
+ public synchronized void renew(final AlterReadwriteSplittingConfigurationEvent event) {
+ ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+ ReadwriteSplittingDataSourceRuleConfiguration needToAlteredConfig = YamlEngine.unmarshal(event.getData(), ReadwriteSplittingDataSourceRuleConfiguration.class);
+ Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+ ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+ config.getDataSources().removeIf(each -> each.getName().equals(event.getGroupName()));
+ ruleConfigs.add(new ReadwriteSplittingRuleConfiguration(Collections.singletonList(needToAlteredConfig), Collections.emptyMap()));
+ database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+ }
+
+ /**
+ * Renew with delete readwrite-splitting configuration.
+ *
+ * @param event delete readwrite-splitting configuration event
+ */
+ @Subscribe
+ public synchronized void renew(final DeleteReadwriteSplittingConfigurationEvent event) {
+ ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+ Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+ ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+ config.getDataSources().removeIf(each -> each.getName().equals(event.getGroupName()));
+ ruleConfigs.add(config);
+ database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+ }
+}
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
new file mode 100644
index 00000000000..a9007d46e05
--- /dev/null
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.readwritesplitting.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance.AddLoadBalanceEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance.AlterLoadBalanceEvent;
+import org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance.DeleteLoadBalanceEvent;
+import org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
+import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Readwrite-splitting load-balance subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@RequiredArgsConstructor
+public final class ReadwriteSplittingLoadBalanceSubscriber implements RuleConfigurationSubscribeCoordinator {
+
+ private Map<String, ShardingSphereDatabase> databases;
+
+ @Override
+ public void registerRuleConfigurationSubscriber(final Map<String, ShardingSphereDatabase> databases, final InstanceContext instanceContext) {
+ this.databases = databases;
+ instanceContext.getEventBusContext().register(this);
+ }
+
+ /**
+ * Renew with add load-balance.
+ *
+ * @param event add load-balance event
+ */
+ @Subscribe
+ public synchronized void renew(final AddLoadBalanceEvent event) {
+ renew(event.getDatabaseName(), event.getLoadBalanceName(), event.getData());
+ }
+
+ /**
+ * Renew with alter load-balance.
+ *
+ * @param event alter load-balance event
+ */
+ @Subscribe
+ public synchronized void renew(final AlterLoadBalanceEvent event) {
+ renew(event.getDatabaseName(), event.getLoadBalanceName(), event.getData());
+ }
+
+ private void renew(final String databaseName, final String loadBalanceName, final String data) {
+ ShardingSphereDatabase database = databases.get(databaseName);
+ Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+ ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+ config.getLoadBalancers().put(loadBalanceName, YamlEngine.unmarshal(data, AlgorithmConfiguration.class));
+ ruleConfigs.add(config);
+ database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+ }
+
+ /**
+ * Renew with delete load-balance.
+ *
+ * @param event delete load-balance event
+ */
+ @Subscribe
+ public synchronized void renew(final DeleteLoadBalanceEvent event) {
+ ShardingSphereDatabase database = databases.get(event.getDatabaseName());
+ Collection<RuleConfiguration> ruleConfigs = new LinkedList<>(database.getRuleMetaData().getConfigurations());
+ ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
+ config.getLoadBalancers().remove(event.getLoadBalanceName());
+ ruleConfigs.add(config);
+ database.getRuleMetaData().getConfigurations().addAll(ruleConfigs);
+ }
+}
diff --git a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator
new file mode 100644
index 00000000000..9e8adc24813
--- /dev/null
+++ b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.readwritesplitting.subscriber.ReadwriteSplittingConfigurationSubscriber
+org.apache.shardingsphere.readwritesplitting.subscriber.ReadwriteSplittingLoadBalanceSubscriber
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/RuleConfigurationSubscribeCoordinator.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/RuleConfigurationSubscribeCoordinator.java
new file mode 100644
index 00000000000..2079f40ef44
--- /dev/null
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/RuleConfigurationSubscribeCoordinator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule;
+
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+
+import java.util.Map;
+
+public interface RuleConfigurationSubscribeCoordinator {
+
+ /**
+ * Register rule configuration subscriber.
+ *
+ * @param databases databases
+ * @param instanceContext instance context
+ */
+ void registerRuleConfigurationSubscriber(Map<String, ShardingSphereDatabase> databases, InstanceContext instanceContext);
+}
diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java
index 90b01188618..cc08d858b4e 100644
--- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java
+++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/metadata/config/readwritesplitting/ReadwriteSplittingNodeConverter.java
@@ -22,12 +22,6 @@ package org.apache.shardingsphere.metadata.persist.node.metadata.config.readwrit
*/
public final class ReadwriteSplittingNodeConverter {
- private static final String ROOT_NODE = "readwrite_splitting";
-
- private static final String ACTIVE_VERSION_NODE = "/active_version";
-
- private static final String VERSIONS_NODE = "/versions";
-
private static final String LOAD_BALANCER_NODE = "load_balancers";
/**
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/FeatureEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/FeatureEvent.java
new file mode 100644
index 00000000000..750d4728213
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/FeatureEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule;
+
+/**
+ * Feature event.
+ */
+public interface FeatureEvent {
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AddReadwriteSplittingConfigurationEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AddReadwriteSplittingConfigurationEvent.java
new file mode 100644
index 00000000000..89f52e7bdf2
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AddReadwriteSplittingConfigurationEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * Add readwrite-splitting configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AddReadwriteSplittingConfigurationEvent implements FeatureEvent {
+
+ private final String databaseName;
+
+ private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AlterReadwriteSplittingConfigurationEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AlterReadwriteSplittingConfigurationEvent.java
new file mode 100644
index 00000000000..c901790f048
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/AlterReadwriteSplittingConfigurationEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * alter readwrite-splitting configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AlterReadwriteSplittingConfigurationEvent implements FeatureEvent {
+
+ private final String databaseName;
+
+ private final String groupName;
+
+ private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/DeleteReadwriteSplittingConfigurationEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/DeleteReadwriteSplittingConfigurationEvent.java
new file mode 100644
index 00000000000..1cccf4b737f
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/configuration/DeleteReadwriteSplittingConfigurationEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.configuration;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * delete readwrite-splitting configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DeleteReadwriteSplittingConfigurationEvent implements FeatureEvent {
+
+ private final String databaseName;
+
+ private final String groupName;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AddLoadBalanceEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AddLoadBalanceEvent.java
new file mode 100644
index 00000000000..4f80780ca6d
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AddLoadBalanceEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * Add load-balance event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AddLoadBalanceEvent implements FeatureEvent {
+
+ private final String databaseName;
+
+ private final String loadBalanceName;
+
+ private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AlterLoadBalanceEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AlterLoadBalanceEvent.java
new file mode 100644
index 00000000000..070d41ccc89
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/AlterLoadBalanceEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * alter load-balance event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class AlterLoadBalanceEvent implements FeatureEvent {
+
+ private final String databaseName;
+
+ private final String loadBalanceName;
+
+ private final String data;
+}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/DeleteLoadBalanceEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/DeleteLoadBalanceEvent.java
new file mode 100644
index 00000000000..24c383de5f7
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/rule/readwritesplitting/loadbalance/DeleteLoadBalanceEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.rule.readwritesplitting.loadbalance;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.rule.FeatureEvent;
+
+/**
+ * delete load-balance event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DeleteLoadBalanceEvent implements FeatureEvent {
+
+ private final String databaseName;
+
+ private final String loadBalanceName;
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
index 1952636f39f..b59508afda3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/NewClusterContextManagerBuilder.java
@@ -21,7 +21,9 @@ import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.infra.rule.RuleConfigurationSubscribeCoordinator;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.metadata.persist.NewMetaDataPersistService;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
@@ -31,7 +33,6 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.NewContextManagerSubscriberFacade;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.NewMetaDataContextsFactory;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -40,7 +41,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import java.sql.SQLException;
/**
- * TODO replace the old implementation after meta data refactor completed
+ * TODO Rename ClusterContextManagerBuilder when metadata structure adjustment completed. #25485
* New cluster context manager builder.
*/
public final class NewClusterContextManagerBuilder implements ContextManagerBuilder {
@@ -57,7 +58,7 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
MetaDataContexts metaDataContexts = NewMetaDataContextsFactory.create(persistService, param, instanceContext, registryCenter.getStorageNodeStatusService().loadStorageNodes());
ContextManager result = new ContextManager(metaDataContexts, instanceContext);
setContextManagerAware(result);
- registerOnline(persistService, registryCenter, param, result);
+ registerOnline(registryCenter, param, result);
return result;
}
@@ -77,11 +78,11 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
((ContextManagerAware) contextManager.getInstanceContext().getModeContextManager()).setContextManagerAware(contextManager);
}
- private void registerOnline(final NewMetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
+ private void registerOnline(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
loadClusterStatus(registryCenter, contextManager);
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
- new NewContextManagerSubscriberFacade(persistService, registryCenter, contextManager);
+ registerRuleConfigurationSubscribers(contextManager.getMetaDataContexts(), contextManager.getInstanceContext());
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
}
@@ -90,6 +91,11 @@ public final class NewClusterContextManagerBuilder implements ContextManagerBuil
contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
}
+ private void registerRuleConfigurationSubscribers(final MetaDataContexts metaDataContexts, final InstanceContext instanceContext) {
+ ShardingSphereServiceLoader.getServiceInstances(RuleConfigurationSubscribeCoordinator.class)
+ .forEach(each -> each.registerRuleConfigurationSubscriber(metaDataContexts.getMetaData().getDatabases(), instanceContext));
+ }
+
@Override
public String getType() {
return "New_Cluster";
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcher.java
new file mode 100644
index 00000000000..a62ebaeda4a
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/NewGovernanceWatcher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.Collection;
+import java.util.Optional;
+
+// TODO Rename GovernanceWatcher when metadata structure adjustment completed. #25485
+/**
+ * Governance watcher.
+ *
+ * @param <T> type of event
+ */
+@SingletonSPI
+public interface NewGovernanceWatcher<T> {
+
+ /**
+ * Get watching keys.
+ *
+ * @param databaseName database name
+ * @return watching keys
+ */
+ Collection<String> getWatchingKeys(String databaseName);
+
+ /**
+ * Get watching types.
+ *
+ * @return watching types
+ */
+ Collection<Type> getWatchingTypes();
+
+ /**
+ * Create governance event.
+ *
+ * @param event registry center data changed event
+ * @return governance event
+ */
+ Optional<T> createGovernanceEvent(DataChangedEvent event);
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
new file mode 100644
index 00000000000..3cc485c9a8e
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/NewMetaDataChangedWatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher;
+
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Arrays;
+
+/**
+ * TODO Rename MetaDataChangedWatcher when metadata structure adjustment completed. #25485
+ * Meta data changed watcher.
+ */
+public final class NewMetaDataChangedWatcher implements NewGovernanceWatcher<GovernanceEvent> {
+
+ @Override
+ public Collection<String> getWatchingKeys(final String databaseName) {
+ return null == databaseName ? Collections.singleton(DatabaseMetaDataNode.getMetaDataNodePath())
+ : Collections.singleton(DatabaseMetaDataNode.getDatabaseNamePath(databaseName));
+ }
+
+ @Override
+ public Collection<Type> getWatchingTypes() {
+ return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+ }
+
+ @Override
+ public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
+ return Optional.empty();
+ }
+}
diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher
new file mode 100644
index 00000000000..3128c6eb3dd
--- /dev/null
+++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.NewGovernanceWatcher
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.NewMetaDataChangedWatcher