You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2023/06/27 03:21:51 UTC
[shardingsphere] branch master updated: Refactor readwrite-splitting subscriber (#26602)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4daa41cd3 Refactor readwrite-splitting subscriber (#26602)
5d4daa41cd3 is described below
commit 5d4daa41cd3e227bab84eaa09020fcdbe01e0daa
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Tue Jun 27 11:21:44 2023 +0800
Refactor readwrite-splitting subscriber (#26602)
* Refactor readwrite subscriber
* Update code style
* Update
---
.../ReadwriteSplittingDataSourceSubscriber.java | 35 +++++++++++++++-------
.../ReadwriteSplittingLoadBalanceSubscriber.java | 31 ++++++++++++++++---
2 files changed, 51 insertions(+), 15 deletions(-)
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java
index 8972a1ac8b7..d990f53c575 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java
@@ -37,6 +37,8 @@ import org.apache.shardingsphere.readwritesplitting.yaml.config.rule.YamlReadwri
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
+import java.util.Collection;
+import java.util.LinkedList;
/**
* Readwrite-splitting configuration subscriber.
@@ -59,19 +61,10 @@ public final class ReadwriteSplittingDataSourceSubscriber implements RuleChanged
if (!event.getActiveVersion().equals(instanceContext.getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))) {
return;
}
-
- ShardingSphereDatabase database = databases.get(event.getDatabaseName());
ReadwriteSplittingDataSourceRuleConfiguration needToAddedConfig = swapDataSource(event.getGroupName(),
instanceContext.getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion()));
- Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
- ReadwriteSplittingRuleConfiguration config;
- if (rule.isPresent()) {
- config = (ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration();
- config.getDataSources().add(needToAddedConfig);
- } else {
- config = new ReadwriteSplittingRuleConfiguration(Collections.singletonList(needToAddedConfig), Collections.emptyMap());
- }
- instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
+ instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(),
+ getConfig(databases.get(event.getDatabaseName()), needToAddedConfig)));
}
/**
@@ -106,6 +99,26 @@ public final class ReadwriteSplittingDataSourceSubscriber implements RuleChanged
instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
}
+ private ReadwriteSplittingRuleConfiguration getConfig(final ShardingSphereDatabase database, final ReadwriteSplittingDataSourceRuleConfiguration needToAddedConfig) {
+ Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
+ if (rule.isPresent()) {
+ return getConfig((ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration(), needToAddedConfig);
+ }
+ Collection<ReadwriteSplittingDataSourceRuleConfiguration> dataSourceConfigs = new LinkedList<>();
+ dataSourceConfigs.add(needToAddedConfig);
+ return new ReadwriteSplittingRuleConfiguration(dataSourceConfigs, Collections.emptyMap());
+ }
+
+ private ReadwriteSplittingRuleConfiguration getConfig(final ReadwriteSplittingRuleConfiguration result, final ReadwriteSplittingDataSourceRuleConfiguration dataSourceRuleConfig) {
+ if (null == result.getDataSources()) {
+ Collection<ReadwriteSplittingDataSourceRuleConfiguration> dataSources = new LinkedList<>();
+ dataSources.add(dataSourceRuleConfig);
+ return new ReadwriteSplittingRuleConfiguration(dataSources, result.getLoadBalancers());
+ }
+ result.getDataSources().add(dataSourceRuleConfig);
+ return result;
+ }
+
private ReadwriteSplittingDataSourceRuleConfiguration swapDataSource(final String name, final String yamlContext) {
YamlReadwriteSplittingDataSourceRuleConfiguration yamlDataSourceRuleConfig = YamlEngine.unmarshal(yamlContext, YamlReadwriteSplittingDataSourceRuleConfiguration.class);
return new ReadwriteSplittingDataSourceRuleConfiguration(name, yamlDataSourceRuleConfig.getWriteDataSourceName(), yamlDataSourceRuleConfig.getReadDataSourceNames(),
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
index 0160aff21d2..570ed6681b7 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
@@ -32,7 +32,10 @@ import org.apache.shardingsphere.readwritesplitting.event.loadbalance.AlterLoadB
import org.apache.shardingsphere.readwritesplitting.event.loadbalance.DeleteLoadBalanceEvent;
import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
/**
* Readwrite-splitting load-balance subscriber.
@@ -55,10 +58,10 @@ public final class ReadwriteSplittingLoadBalanceSubscriber implements RuleChange
if (!event.getActiveVersion().equals(instanceContext.getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))) {
return;
}
- ShardingSphereDatabase database = databases.get(event.getDatabaseName());
- ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
- config.getLoadBalancers().put(event.getLoadBalanceName(),
- swapToAlgorithmConfig(instanceContext.getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion())));
+ AlgorithmConfiguration needToAltered =
+ swapToAlgorithmConfig(instanceContext.getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion()));
+ instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(),
+ getConfig(databases.get(event.getDatabaseName()), event.getLoadBalanceName(), needToAltered)));
}
/**
@@ -74,6 +77,26 @@ public final class ReadwriteSplittingLoadBalanceSubscriber implements RuleChange
instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
}
+ private ReadwriteSplittingRuleConfiguration getConfig(final ShardingSphereDatabase database, final String loadBalanceName, final AlgorithmConfiguration needToAltered) {
+ Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
+ if (rule.isPresent()) {
+ return getConfig((ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration(), loadBalanceName, needToAltered);
+ }
+ Map<String, AlgorithmConfiguration> loadBalancers = new LinkedHashMap<>();
+ loadBalancers.put(loadBalanceName, needToAltered);
+ return new ReadwriteSplittingRuleConfiguration(Collections.emptyList(), loadBalancers);
+ }
+
+ private ReadwriteSplittingRuleConfiguration getConfig(final ReadwriteSplittingRuleConfiguration result, final String loadBalanceName, final AlgorithmConfiguration needToAltered) {
+ if (null == result.getLoadBalancers()) {
+ Map<String, AlgorithmConfiguration> loadBalancers = new LinkedHashMap<>();
+ loadBalancers.put(loadBalanceName, needToAltered);
+ return new ReadwriteSplittingRuleConfiguration(result.getDataSources(), loadBalancers);
+ }
+ result.getLoadBalancers().put(loadBalanceName, needToAltered);
+ return result;
+ }
+
private AlgorithmConfiguration swapToAlgorithmConfig(final String yamlContext) {
return new YamlAlgorithmConfigurationSwapper().swapToObject(YamlEngine.unmarshal(yamlContext, YamlAlgorithmConfiguration.class));
}