You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/27 02:19:01 UTC

[shardingsphere] branch master updated: Do not start scaling job when sharding rule content is not changed (#13295)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac9007  Do not start scaling job when sharding rule content is not changed (#13295)
7ac9007 is described below

commit 7ac90071b6bf232798d1fb3da5e79b33b43291d2
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Oct 27 10:17:29 2021 +0800

    Do not start scaling job when sharding rule content is not changed (#13295)
---
 .../registry/cache/event/StartScalingEvent.java    |  5 +++
 .../scaling/core/api/ScalingWorker.java            | 45 +++++++++++++++++-----
 .../scaling/core/config/JobConfiguration.java      |  4 ++
 3 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java
index ce7e37a..39690bf 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java
@@ -42,4 +42,9 @@ public final class StartScalingEvent {
     public StartScalingEvent(final String schemaName, final String sourceDataSource, final String sourceRule, final String targetRule, final String ruleCacheId) {
         this(schemaName, sourceDataSource, sourceRule, sourceDataSource, targetRule, ruleCacheId);
     }
+    
+    @Override
+    public String toString() {
+        return "StartScalingEvent{" + "schemaName='" + schemaName + '\'' + ", ruleCacheId='" + ruleCacheId + '\'' + '}';
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
index 7483c21..987357c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -33,9 +33,12 @@ import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.executor.job.FinishedCheckJobExecutor;
 import org.apache.shardingsphere.scaling.core.executor.job.ScalingJobExecutor;
+import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.yaml.config.rule.YamlTableRuleConfiguration;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 
 /**
@@ -69,29 +72,53 @@ public final class ScalingWorker {
     @Subscribe
     public void start(final StartScalingEvent event) {
         log.info("Start scaling job by {}", event);
-        Optional<Long> jobId = scalingAPI.start(createJobConfig(event));
+        YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
+        YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
+        Optional<YamlShardingRuleConfiguration> sourceShardingConfigOptional = getYamlShardingRuleConfiguration(sourceRootConfig);
+        Optional<YamlShardingRuleConfiguration> targetShardingConfigOptional = getYamlShardingRuleConfiguration(targetRootConfig);
+        if (!sourceShardingConfigOptional.isPresent() || !targetShardingConfigOptional.isPresent()) {
+            log.info("sourceShardingConfig or targetShardingConfig not present, ignore");
+            return;
+        }
+        if (isShardingRulesTheSame(sourceShardingConfigOptional.get(), targetShardingConfigOptional.get())) {
+            log.info("source and target sharding configuration is the same, ignore");
+            return;
+        }
+        JobConfiguration jobConfig = new JobConfiguration(getRuleConfiguration(sourceRootConfig, targetRootConfig), getHandleConfiguration(event));
+        Optional<Long> jobId = scalingAPI.start(jobConfig);
         if (!jobId.isPresent()) {
             log.info("Switch rule configuration ruleCacheId = {} immediately.", event.getRuleCacheId());
             ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
         }
     }
     
-    private JobConfiguration createJobConfig(final StartScalingEvent event) {
-        JobConfiguration result = new JobConfiguration();
-        result.setRuleConfig(getRuleConfiguration(event));
-        result.setHandleConfig(new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId())));
-        return result;
+    private Optional<YamlShardingRuleConfiguration> getYamlShardingRuleConfiguration(final YamlRootConfiguration rootConfig) {
+        return rootConfig.getRules().stream().filter(each -> each instanceof YamlShardingRuleConfiguration).map(each -> (YamlShardingRuleConfiguration) each).findFirst();
     }
     
-    private RuleConfiguration getRuleConfiguration(final StartScalingEvent event) {
+    private boolean isShardingRulesTheSame(final YamlShardingRuleConfiguration sourceShardingConfig, final YamlShardingRuleConfiguration targetShardingConfig) {
+        for (Entry<String, YamlTableRuleConfiguration> entry : sourceShardingConfig.getTables().entrySet()) {
+            entry.getValue().setLogicTable(null);
+        }
+        for (Entry<String, YamlTableRuleConfiguration> entry : targetShardingConfig.getTables().entrySet()) {
+            entry.getValue().setLogicTable(null);
+        }
+        String sourceShardingConfigYaml = YamlEngine.marshal(sourceShardingConfig);
+        String targetShardingConfigYaml = YamlEngine.marshal(targetShardingConfig);
+        return sourceShardingConfigYaml.equals(targetShardingConfigYaml);
+    }
+    
+    private RuleConfiguration getRuleConfiguration(final YamlRootConfiguration sourceRootConfig, final YamlRootConfiguration targetRootConfig) {
         RuleConfiguration result = new RuleConfiguration();
-        YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
-        YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
         result.setSource(new ShardingSphereJDBCDataSourceConfiguration(sourceRootConfig).wrap());
         result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(targetRootConfig).wrap());
         return result;
     }
     
+    private HandleConfiguration getHandleConfiguration(final StartScalingEvent event) {
+        return new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+    }
+    
     @SuppressWarnings("unchecked")
     private YamlRootConfiguration getYamlRootConfiguration(final String schemaName, final String dataSources, final String rules) {
         YamlRootConfiguration result = new YamlRootConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index 06a2991..2a7cba2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -17,12 +17,16 @@
 
 package org.apache.shardingsphere.scaling.core.config;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 
 /**
  * Scaling job configuration.
  */
+@NoArgsConstructor
+@AllArgsConstructor
 @Getter
 @Setter
 public final class JobConfiguration {