You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/26 01:53:33 UTC

[shardingsphere] branch master updated: Improve scaling job start process when rule cache changed (#13266)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0133432  Improve scaling job start process when rule cache changed (#13266)
0133432 is described below

commit 0133432127b272f0104c693de1175e33c1f5161e
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Oct 26 09:52:48 2021 +0800

    Improve scaling job start process when rule cache changed (#13266)
    
    * Fix ShardingSphereJDBCDataSourceConfiguration init exception
    
    * Improve creating scaling job
    
    * Unit test
---
 .../subscriber/ScalingRegistrySubscriber.java      |  9 ++++---
 .../event/rule/ScalingTaskFinishedEvent.java       |  2 ++
 .../scaling/core/api/ScalingWorker.java            | 22 ++++++++++++++--
 .../scaling/core/config/RuleConfiguration.java     | 29 ++++++++++++++++++++--
 .../ShardingSphereJDBCDataSourceConfiguration.java | 10 ++++++--
 .../YamlParameterConfiguration.java}               | 22 +++++++++++-----
 .../scaling/core/util/JDBCUtilTest.java            |  2 +-
 7 files changed, 79 insertions(+), 17 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index f77c7cf..73b1aca 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -88,10 +88,11 @@ public final class ScalingRegistrySubscriber {
      */
     @Subscribe
     public void cacheRuleConfiguration(final RuleConfigurationCachedEvent event) {
-        StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(),
-                repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName())),
-                repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName())),
-                registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId()), event.getCacheId());
+        String sourceDataSource = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
+        String sourceRule = repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
+        String targetRule = registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId());
+        String ruleCacheId = event.getCacheId();
+        StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(), sourceDataSource, sourceRule, targetRule, ruleCacheId);
         ShardingSphereEventBus.getInstance().post(startScalingEvent);
     }
     
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
index b2f1e13..f96cb63 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule;
 
 import lombok.Getter;
+import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
 /**
@@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor;
 @Getter
 public final class ScalingTaskFinishedEvent {
     
+    @NonNull
     private final String targetSchemaName;
     
     private final String targetParameter;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
index 4f1efb9..086cfb8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -20,6 +20,9 @@ package org.apache.shardingsphere.scaling.core.api;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.SwitchRuleConfigurationEvent;
 import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
@@ -30,6 +33,8 @@ import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJD
 import org.apache.shardingsphere.scaling.core.executor.job.FinishedCheckJobExecutor;
 import org.apache.shardingsphere.scaling.core.executor.job.ScalingJobExecutor;
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -75,8 +80,21 @@ public final class ScalingWorker {
     
     private RuleConfiguration getRuleConfiguration(final StartScalingEvent event) {
         RuleConfiguration result = new RuleConfiguration();
-        result.setSource(new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()).wrap());
-        result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule()).wrap());
+        YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
+        YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
+        result.setSource(new ShardingSphereJDBCDataSourceConfiguration(sourceRootConfig).wrap());
+        result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(targetRootConfig).wrap());
+        return result;
+    }
+    
+    @SuppressWarnings("unchecked")
+    private YamlRootConfiguration getYamlRootConfiguration(final String schemaName, final String dataSources, final String rules) {
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setSchemaName(schemaName);
+        Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(dataSources, Map.class);
+        result.setDataSources(yamlDataSources);
+        Collection<YamlRuleConfiguration> yamlRuleConfigs = YamlEngine.unmarshal(rules, Collection.class);
+        result.setRules(yamlRuleConfigs);
         return result;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
index 53218c9..93597ca 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
@@ -17,18 +17,43 @@
 
 package org.apache.shardingsphere.scaling.core.config;
 
+import com.google.common.base.Preconditions;
 import lombok.Getter;
-import lombok.Setter;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
 
 /**
  * Rule configuration.
  */
 @Getter
-@Setter
 public final class RuleConfiguration {
     
     private ScalingDataSourceConfigurationWrap source;
     
     private ScalingDataSourceConfigurationWrap target;
+    
+    /**
+     * Set source.
+     *
+     * @param source source configuration
+     */
+    public void setSource(final ScalingDataSourceConfigurationWrap source) {
+        checkParameters(source);
+        this.source = source;
+    }
+    
+    private void checkParameters(final ScalingDataSourceConfigurationWrap wrap) {
+        Preconditions.checkNotNull(wrap);
+        Preconditions.checkNotNull(wrap.getType());
+        Preconditions.checkNotNull(wrap.getParameter());
+    }
+    
+    /**
+     * Set target.
+     *
+     * @param target target configuration
+     */
+    public void setTarget(final ScalingDataSourceConfigurationWrap target) {
+        checkParameters(target);
+        this.target = target;
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java
index c8ebd71..423d5c7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.scaling.core.config.yaml.ShardingRuleConfigurationSwapper;
+import org.apache.shardingsphere.scaling.core.config.yaml.YamlParameterConfiguration;
 import org.apache.shardingsphere.scaling.core.util.JDBCUtil;
 
 import javax.sql.DataSource;
@@ -58,13 +59,18 @@ public final class ShardingSphereJDBCDataSourceConfiguration implements ScalingD
         databaseType = DatabaseTypeRegistry.getDatabaseTypeByURL(JDBCUtil.getJdbcUrl(props));
     }
     
-    public ShardingSphereJDBCDataSourceConfiguration(final String dataSources, final String rules) {
-        this(String.format("%s\n%s", dataSources, rules));
+    public ShardingSphereJDBCDataSourceConfiguration(final YamlRootConfiguration rootConfig) {
+        YamlParameterConfiguration parameterConfig = new YamlParameterConfiguration(rootConfig.getDataSources(), rootConfig.getRules());
+        this.parameter = YamlEngine.marshal(parameterConfig);
+        this.rootConfig = rootConfig;
+        Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
+        databaseType = DatabaseTypeRegistry.getDatabaseTypeByURL(JDBCUtil.getJdbcUrl(props));
     }
     
     @Override
     public ScalingDataSourceConfigurationWrap wrap() {
         ScalingDataSourceConfigurationWrap result = new ScalingDataSourceConfigurationWrap();
+        result.setSchemaName(rootConfig.getSchemaName());
         result.setType(TYPE);
         result.setParameter(parameter);
         return result;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/yaml/YamlParameterConfiguration.java
similarity index 56%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/yaml/YamlParameterConfiguration.java
index 53218c9..635a692 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/yaml/YamlParameterConfiguration.java
@@ -15,20 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.config.yaml;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
 
 /**
- * Rule configuration.
+ * YAML parameter configuration.
  */
+@NoArgsConstructor
+@AllArgsConstructor
 @Getter
 @Setter
-public final class RuleConfiguration {
+public final class YamlParameterConfiguration implements YamlConfiguration {
     
-    private ScalingDataSourceConfigurationWrap source;
+    private Map<String, Map<String, Object>> dataSources = new HashMap<>();
     
-    private ScalingDataSourceConfigurationWrap target;
+    private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
index 742a42d..6d6d7be 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
@@ -43,7 +43,7 @@ public final class JDBCUtilTest {
     
     @Test
     public void assertAppendShardingSphereJDBCDataSourceConfig() {
-        ShardingSphereJDBCDataSourceConfiguration dataSourceConfig = new ShardingSphereJDBCDataSourceConfiguration(getDataSourceYaml(), "");
+        ShardingSphereJDBCDataSourceConfiguration dataSourceConfig = new ShardingSphereJDBCDataSourceConfiguration(getDataSourceYaml());
         JDBCUtil.appendJDBCParameter(dataSourceConfig, ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
         List<DataSourceConfiguration> actual = new ArrayList<>(getDataSourceConfigurations(dataSourceConfig.getRootConfig().getDataSources()).values());
         assertThat(actual.get(0).getProps().get("url"), is("jdbc:mysql://192.168.0.2:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));