You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/27 02:19:01 UTC
[shardingsphere] branch master updated: Do not start scaling job
when sharding rule content is not changed (#13295)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7ac9007 Do not start scaling job when sharding rule content is not changed (#13295)
7ac9007 is described below
commit 7ac90071b6bf232798d1fb3da5e79b33b43291d2
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Oct 27 10:17:29 2021 +0800
Do not start scaling job when sharding rule content is not changed (#13295)
---
.../registry/cache/event/StartScalingEvent.java | 5 +++
.../scaling/core/api/ScalingWorker.java | 45 +++++++++++++++++-----
.../scaling/core/config/JobConfiguration.java | 4 ++
3 files changed, 45 insertions(+), 9 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java
index ce7e37a..39690bf 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/event/StartScalingEvent.java
@@ -42,4 +42,9 @@ public final class StartScalingEvent {
public StartScalingEvent(final String schemaName, final String sourceDataSource, final String sourceRule, final String targetRule, final String ruleCacheId) {
this(schemaName, sourceDataSource, sourceRule, sourceDataSource, targetRule, ruleCacheId);
}
+
+ @Override
+ public String toString() {
+ return "StartScalingEvent{" + "schemaName='" + schemaName + '\'' + ", ruleCacheId='" + ruleCacheId + '\'' + '}';
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
index 7483c21..987357c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -33,9 +33,12 @@ import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.executor.job.FinishedCheckJobExecutor;
import org.apache.shardingsphere.scaling.core.executor.job.ScalingJobExecutor;
+import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.yaml.config.rule.YamlTableRuleConfiguration;
import java.util.Collection;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
/**
@@ -69,29 +72,53 @@ public final class ScalingWorker {
@Subscribe
public void start(final StartScalingEvent event) {
log.info("Start scaling job by {}", event);
- Optional<Long> jobId = scalingAPI.start(createJobConfig(event));
+ YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
+ YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
+ Optional<YamlShardingRuleConfiguration> sourceShardingConfigOptional = getYamlShardingRuleConfiguration(sourceRootConfig);
+ Optional<YamlShardingRuleConfiguration> targetShardingConfigOptional = getYamlShardingRuleConfiguration(targetRootConfig);
+ if (!sourceShardingConfigOptional.isPresent() || !targetShardingConfigOptional.isPresent()) {
+ log.info("sourceShardingConfig or targetShardingConfig not present, ignore");
+ return;
+ }
+ if (isShardingRulesTheSame(sourceShardingConfigOptional.get(), targetShardingConfigOptional.get())) {
+ log.info("source and target sharding configuration is the same, ignore");
+ return;
+ }
+ JobConfiguration jobConfig = new JobConfiguration(getRuleConfiguration(sourceRootConfig, targetRootConfig), getHandleConfiguration(event));
+ Optional<Long> jobId = scalingAPI.start(jobConfig);
if (!jobId.isPresent()) {
log.info("Switch rule configuration ruleCacheId = {} immediately.", event.getRuleCacheId());
ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
}
}
- private JobConfiguration createJobConfig(final StartScalingEvent event) {
- JobConfiguration result = new JobConfiguration();
- result.setRuleConfig(getRuleConfiguration(event));
- result.setHandleConfig(new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId())));
- return result;
+ private Optional<YamlShardingRuleConfiguration> getYamlShardingRuleConfiguration(final YamlRootConfiguration rootConfig) {
+ return rootConfig.getRules().stream().filter(each -> each instanceof YamlShardingRuleConfiguration).map(each -> (YamlShardingRuleConfiguration) each).findFirst();
}
- private RuleConfiguration getRuleConfiguration(final StartScalingEvent event) {
+ private boolean isShardingRulesTheSame(final YamlShardingRuleConfiguration sourceShardingConfig, final YamlShardingRuleConfiguration targetShardingConfig) {
+ for (Entry<String, YamlTableRuleConfiguration> entry : sourceShardingConfig.getTables().entrySet()) {
+ entry.getValue().setLogicTable(null);
+ }
+ for (Entry<String, YamlTableRuleConfiguration> entry : targetShardingConfig.getTables().entrySet()) {
+ entry.getValue().setLogicTable(null);
+ }
+ String sourceShardingConfigYaml = YamlEngine.marshal(sourceShardingConfig);
+ String targetShardingConfigYaml = YamlEngine.marshal(targetShardingConfig);
+ return sourceShardingConfigYaml.equals(targetShardingConfigYaml);
+ }
+
+ private RuleConfiguration getRuleConfiguration(final YamlRootConfiguration sourceRootConfig, final YamlRootConfiguration targetRootConfig) {
RuleConfiguration result = new RuleConfiguration();
- YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
- YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
result.setSource(new ShardingSphereJDBCDataSourceConfiguration(sourceRootConfig).wrap());
result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(targetRootConfig).wrap());
return result;
}
+ private HandleConfiguration getHandleConfiguration(final StartScalingEvent event) {
+ return new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+ }
+
@SuppressWarnings("unchecked")
private YamlRootConfiguration getYamlRootConfiguration(final String schemaName, final String dataSources, final String rules) {
YamlRootConfiguration result = new YamlRootConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index 06a2991..2a7cba2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -17,12 +17,16 @@
package org.apache.shardingsphere.scaling.core.config;
+import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* Scaling job configuration.
*/
+@NoArgsConstructor
+@AllArgsConstructor
@Getter
@Setter
public final class JobConfiguration {