You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2023/06/27 03:21:51 UTC

[shardingsphere] branch master updated: Refactor readwrite-splitting subscriber (#26602)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d4daa41cd3 Refactor readwrite-splitting subscriber (#26602)
5d4daa41cd3 is described below

commit 5d4daa41cd3e227bab84eaa09020fcdbe01e0daa
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Tue Jun 27 11:21:44 2023 +0800

    Refactor readwrite-splitting subscriber (#26602)
    
    * Refactor readwrite subscriber
    
    * Update code style
    
    * Update
---
 .../ReadwriteSplittingDataSourceSubscriber.java    | 35 +++++++++++++++-------
 .../ReadwriteSplittingLoadBalanceSubscriber.java   | 31 ++++++++++++++++---
 2 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java
index 8972a1ac8b7..d990f53c575 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingDataSourceSubscriber.java
@@ -37,6 +37,8 @@ import org.apache.shardingsphere.readwritesplitting.yaml.config.rule.YamlReadwri
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Collection;
+import java.util.LinkedList;
 
 /**
  * Readwrite-splitting configuration subscriber.
@@ -59,19 +61,10 @@ public final class ReadwriteSplittingDataSourceSubscriber implements RuleChanged
         if (!event.getActiveVersion().equals(instanceContext.getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))) {
             return;
         }
-        
-        ShardingSphereDatabase database = databases.get(event.getDatabaseName());
         ReadwriteSplittingDataSourceRuleConfiguration needToAddedConfig = swapDataSource(event.getGroupName(),
                 instanceContext.getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion()));
-        Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
-        ReadwriteSplittingRuleConfiguration config;
-        if (rule.isPresent()) {
-            config = (ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration();
-            config.getDataSources().add(needToAddedConfig);
-        } else {
-            config = new ReadwriteSplittingRuleConfiguration(Collections.singletonList(needToAddedConfig), Collections.emptyMap());
-        }
-        instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
+        instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(),
+                getConfig(databases.get(event.getDatabaseName()), needToAddedConfig)));
     }
     
     /**
@@ -106,6 +99,26 @@ public final class ReadwriteSplittingDataSourceSubscriber implements RuleChanged
         instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
     }
     
+    private ReadwriteSplittingRuleConfiguration getConfig(final ShardingSphereDatabase database, final ReadwriteSplittingDataSourceRuleConfiguration needToAddedConfig) {
+        Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
+        if (rule.isPresent()) {
+            return getConfig((ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration(), needToAddedConfig);
+        }
+        Collection<ReadwriteSplittingDataSourceRuleConfiguration> dataSourceConfigs = new LinkedList<>();
+        dataSourceConfigs.add(needToAddedConfig);
+        return new ReadwriteSplittingRuleConfiguration(dataSourceConfigs, Collections.emptyMap());
+    }
+    
+    private ReadwriteSplittingRuleConfiguration getConfig(final ReadwriteSplittingRuleConfiguration result, final ReadwriteSplittingDataSourceRuleConfiguration dataSourceRuleConfig) {
+        if (null == result.getDataSources()) {
+            Collection<ReadwriteSplittingDataSourceRuleConfiguration> dataSources = new LinkedList<>();
+            dataSources.add(dataSourceRuleConfig);
+            return new ReadwriteSplittingRuleConfiguration(dataSources, result.getLoadBalancers());
+        }
+        result.getDataSources().add(dataSourceRuleConfig);
+        return result;
+    }
+    
     private ReadwriteSplittingDataSourceRuleConfiguration swapDataSource(final String name, final String yamlContext) {
         YamlReadwriteSplittingDataSourceRuleConfiguration yamlDataSourceRuleConfig = YamlEngine.unmarshal(yamlContext, YamlReadwriteSplittingDataSourceRuleConfiguration.class);
         return new ReadwriteSplittingDataSourceRuleConfiguration(name, yamlDataSourceRuleConfig.getWriteDataSourceName(), yamlDataSourceRuleConfig.getReadDataSourceNames(),
diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
index 0160aff21d2..570ed6681b7 100644
--- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
+++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/ReadwriteSplittingLoadBalanceSubscriber.java
@@ -32,7 +32,10 @@ import org.apache.shardingsphere.readwritesplitting.event.loadbalance.AlterLoadB
 import org.apache.shardingsphere.readwritesplitting.event.loadbalance.DeleteLoadBalanceEvent;
 import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
 
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Readwrite-splitting load-balance subscriber.
@@ -55,10 +58,10 @@ public final class ReadwriteSplittingLoadBalanceSubscriber implements RuleChange
         if (!event.getActiveVersion().equals(instanceContext.getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))) {
             return;
         }
-        ShardingSphereDatabase database = databases.get(event.getDatabaseName());
-        ReadwriteSplittingRuleConfiguration config = (ReadwriteSplittingRuleConfiguration) database.getRuleMetaData().getSingleRule(ReadwriteSplittingRule.class).getConfiguration();
-        config.getLoadBalancers().put(event.getLoadBalanceName(),
-                swapToAlgorithmConfig(instanceContext.getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion())));
+        AlgorithmConfiguration needToAltered =
+                swapToAlgorithmConfig(instanceContext.getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion()));
+        instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(),
+                getConfig(databases.get(event.getDatabaseName()), event.getLoadBalanceName(), needToAltered)));
     }
     
     /**
@@ -74,6 +77,26 @@ public final class ReadwriteSplittingLoadBalanceSubscriber implements RuleChange
         instanceContext.getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
     }
     
+    private ReadwriteSplittingRuleConfiguration getConfig(final ShardingSphereDatabase database, final String loadBalanceName, final AlgorithmConfiguration needToAltered) {
+        Optional<ReadwriteSplittingRule> rule = database.getRuleMetaData().findSingleRule(ReadwriteSplittingRule.class);
+        if (rule.isPresent()) {
+            return getConfig((ReadwriteSplittingRuleConfiguration) rule.get().getConfiguration(), loadBalanceName, needToAltered);
+        }
+        Map<String, AlgorithmConfiguration> loadBalancers = new LinkedHashMap<>();
+        loadBalancers.put(loadBalanceName, needToAltered);
+        return new ReadwriteSplittingRuleConfiguration(Collections.emptyList(), loadBalancers);
+    }
+    
+    private ReadwriteSplittingRuleConfiguration getConfig(final ReadwriteSplittingRuleConfiguration result, final String loadBalanceName, final AlgorithmConfiguration needToAltered) {
+        if (null == result.getLoadBalancers()) {
+            Map<String, AlgorithmConfiguration> loadBalancers = new LinkedHashMap<>();
+            loadBalancers.put(loadBalanceName, needToAltered);
+            return new ReadwriteSplittingRuleConfiguration(result.getDataSources(), loadBalancers);
+        }
+        result.getLoadBalancers().put(loadBalanceName, needToAltered);
+        return result;
+    }
+    
     private AlgorithmConfiguration swapToAlgorithmConfig(final String yamlContext) {
         return new YamlAlgorithmConfigurationSwapper().swapToObject(YamlEngine.unmarshal(yamlContext, YamlAlgorithmConfiguration.class));
     }