You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/27 11:15:14 UTC

[shardingsphere] branch master updated: Refactor of scaling switch cluster configuration (#13315)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 73846fd  Refactor of scaling switch cluster configuration (#13315)
73846fd is described below

commit 73846fd843cf695285b7951560825786a72d70f2
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Oct 27 19:14:33 2021 +0800

    Refactor of scaling switch cluster configuration (#13315)
    
    * Just switch cluster configuration when rule cache changed and no scaling job to start
    
    * Use rules snapshot to switch cluster configuration
    
    * Minor refactor
    
    * Keep delete rule cache when scaling job finished
---
 .../subscriber/ScalingRegistrySubscriber.java      | 34 ++++++++--------------
 .../event/rule/ScalingTaskFinishedEvent.java       | 10 ++++++-
 .../event/rule/SwitchRuleConfigurationEvent.java   | 33 ---------------------
 .../subscriber/ScalingRegistrySubscriberTest.java  |  9 ------
 .../proxy/initializer/BootstrapInitializer.java    |  1 -
 .../scaling/core/api/ScalingWorker.java            | 30 ++++++++++---------
 .../scaling/core/api/impl/ScalingAPIImpl.java      |  6 +++-
 7 files changed, 43 insertions(+), 80 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index 73b1aca..ddf2c48 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber;
 
 import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
@@ -26,14 +27,12 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ClusterSwitchConfigurationEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationCachedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.SwitchRuleConfigurationEvent;
 import org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
 import org.apache.shardingsphere.mode.metadata.persist.service.impl.SchemaRulePersistService;
 import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
 
 import java.util.Collection;
@@ -45,6 +44,7 @@ import java.util.stream.Collectors;
 /**
  * Scaling registry subscriber.
  */
+@Slf4j
 // TODO move to scaling module
 public final class ScalingRegistrySubscriber {
     
@@ -65,29 +65,12 @@ public final class ScalingRegistrySubscriber {
     }
     
     /**
-     * Switch rule configuration.
-     *
-     * @param event switch rule configuration event
-     */
-    @Subscribe
-    public void switchRuleConfiguration(final SwitchRuleConfigurationEvent event) {
-        persistService.persist(event.getSchemaName(), loadCachedRuleConfigurations(event.getSchemaName(), event.getRuleConfigurationCacheId()));
-        registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getRuleConfigurationCacheId());
-    }
-    
-    @SuppressWarnings("unchecked")
-    private Collection<RuleConfiguration> loadCachedRuleConfigurations(final String schemaName, final String ruleConfigCacheId) {
-        return new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
-                YamlEngine.unmarshal(registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(schemaName), ruleConfigCacheId), Collection.class));
-    }
-    
-    /**
-     * Cache rule configuration.
+     * Rule configuration cached.
      *
      * @param event rule configuration cached event
      */
     @Subscribe
-    public void cacheRuleConfiguration(final RuleConfigurationCachedEvent event) {
+    public void ruleConfigurationCached(final RuleConfigurationCachedEvent event) {
         String sourceDataSource = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
         String sourceRule = repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
         String targetRule = registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId());
@@ -103,12 +86,18 @@ public final class ScalingRegistrySubscriber {
      */
     @Subscribe
     public void scalingTaskFinished(final ScalingTaskFinishedEvent event) {
-        YamlRootConfiguration yamlRootConfiguration = YamlEngine.unmarshal(event.getTargetParameter(), YamlRootConfiguration.class);
+        log.info("scalingTaskFinished, event={}", event);
+        YamlRootConfiguration yamlRootConfiguration = event.getTargetRootConfig();
         Map<String, DataSourceConfiguration> dataSourceConfigs = yamlRootConfiguration.getDataSources().entrySet().stream().collect(Collectors.toMap(
                 Entry::getKey, entry -> new YamlDataSourceConfigurationSwapper().swapToDataSourceConfiguration(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
         Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfiguration.getRules());
         ClusterSwitchConfigurationEvent switchEvent = new ClusterSwitchConfigurationEvent(event.getTargetSchemaName(), dataSourceConfigs, ruleConfigs);
         ShardingSphereEventBus.getInstance().post(switchEvent);
+        String ruleCacheId = event.getRuleCacheId();
+        if (null != ruleCacheId) {
+            log.info("start to delete cache, ruleCacheId={}", ruleCacheId);
+            registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getTargetSchemaName()), ruleCacheId);
+        }
     }
     
     /**
@@ -119,6 +108,7 @@ public final class ScalingRegistrySubscriber {
     @Subscribe
     public void clusterSwitchConfiguration(final ClusterSwitchConfigurationEvent event) {
         String schemaName = event.getTargetSchemaName();
+        log.info("clusterSwitchConfiguration, schemaName={}", schemaName);
         dataSourcePersistService.persist(schemaName, event.getTargetDataSourceConfigs());
         persistService.persist(schemaName, event.getTargetRuleConfigs());
     }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
index f96cb63..1a075e5 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.conf
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 
 /**
  * Scaling task finished event.
@@ -31,5 +32,12 @@ public final class ScalingTaskFinishedEvent {
     @NonNull
     private final String targetSchemaName;
     
-    private final String targetParameter;
+    private final YamlRootConfiguration targetRootConfig;
+    
+    private final String ruleCacheId;
+    
+    @Override
+    public String toString() {
+        return "ScalingTaskFinishedEvent{" + "targetSchemaName='" + targetSchemaName + '\'' + ", ruleCacheId='" + ruleCacheId + '\'' + '}';
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/SwitchRuleConfigurationEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/SwitchRuleConfigurationEvent.java
deleted file mode 100644
index 3c651de..0000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/SwitchRuleConfigurationEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Switch rule configuration event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class SwitchRuleConfigurationEvent {
-    
-    private final String schemaName;
-    
-    private final String ruleConfigurationCacheId;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java
index 01e0322..3e184f9 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java
@@ -54,15 +54,6 @@ public final class ScalingRegistrySubscriberTest {
     }
     
     @Test
-    public void assertSwitchRuleConfiguration() throws ReflectiveOperationException {
-        // Move to scaling module
-//        when(registryCacheManager.loadCache(anyString(), eq("testCacheId"))).thenReturn(readYAML());
-//        SwitchRuleConfigurationEvent event = new SwitchRuleConfigurationEvent("sharding_db", "testCacheId");
-//        scalingRegistrySubscriber.switchRuleConfiguration(event);
-        // TODO finish verify
-    }
-    
-    @Test
     public void assertCacheRuleConfiguration() {
         // TODO finish test case
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index d8894a9..e0f6b97 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -129,7 +129,6 @@ public final class BootstrapInitializer {
         if (null == yamlConfig.getServerConfiguration().getScaling()) {
             return Optional.empty();
         }
-        
         ServerConfiguration result = new ServerConfiguration();
         result.setBlockQueueSize(yamlConfig.getServerConfiguration().getScaling().getBlockQueueSize());
         result.setWorkerThread(yamlConfig.getServerConfiguration().getScaling().getWorkerThread());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
index 987357c..5cea979 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
@@ -72,24 +72,32 @@ public final class ScalingWorker {
     @Subscribe
     public void start(final StartScalingEvent event) {
         log.info("Start scaling job by {}", event);
+        Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
+        Optional<Long> jobId = jobConfigOptional.isPresent() ? scalingAPI.start(jobConfigOptional.get()) : Optional.empty();
+        if (!jobId.isPresent()) {
+            log.info("Switch rule configuration immediately.");
+            YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
+            ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getSchemaName(), targetRootConfig, event.getRuleCacheId());
+            ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+        }
+    }
+    
+    private Optional<JobConfiguration> createJobConfig(final StartScalingEvent event) {
         YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
         YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
         Optional<YamlShardingRuleConfiguration> sourceShardingConfigOptional = getYamlShardingRuleConfiguration(sourceRootConfig);
         Optional<YamlShardingRuleConfiguration> targetShardingConfigOptional = getYamlShardingRuleConfiguration(targetRootConfig);
         if (!sourceShardingConfigOptional.isPresent() || !targetShardingConfigOptional.isPresent()) {
             log.info("sourceShardingConfig or targetShardingConfig not present, ignore");
-            return;
+            return Optional.empty();
         }
         if (isShardingRulesTheSame(sourceShardingConfigOptional.get(), targetShardingConfigOptional.get())) {
             log.info("source and target sharding configuration is the same, ignore");
-            return;
-        }
-        JobConfiguration jobConfig = new JobConfiguration(getRuleConfiguration(sourceRootConfig, targetRootConfig), getHandleConfiguration(event));
-        Optional<Long> jobId = scalingAPI.start(jobConfig);
-        if (!jobId.isPresent()) {
-            log.info("Switch rule configuration ruleCacheId = {} immediately.", event.getRuleCacheId());
-            ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
+            return Optional.empty();
         }
+        RuleConfiguration ruleConfig = getRuleConfiguration(sourceRootConfig, targetRootConfig);
+        HandleConfiguration handleConfig = new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+        return Optional.of(new JobConfiguration(ruleConfig, handleConfig));
     }
     
     private Optional<YamlShardingRuleConfiguration> getYamlShardingRuleConfiguration(final YamlRootConfiguration rootConfig) {
@@ -115,10 +123,6 @@ public final class ScalingWorker {
         return result;
     }
     
-    private HandleConfiguration getHandleConfiguration(final StartScalingEvent event) {
-        return new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
-    }
-    
     @SuppressWarnings("unchecked")
     private YamlRootConfiguration getYamlRootConfiguration(final String schemaName, final String dataSources, final String rules) {
         YamlRootConfiguration result = new YamlRootConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index aa1b619..b5c6aec 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.config.TypedSPIConfiguration;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 import org.apache.shardingsphere.scaling.core.api.DataConsistencyCheckAlgorithmInfo;
@@ -258,7 +259,10 @@ public final class ScalingAPIImpl implements ScalingAPI {
         Optional<Collection<JobContext>> optionalJobContexts = JobSchedulerCenter.getJobContexts(jobId);
         optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> each.setStatus(JobStatus.ALMOST_FINISHED)));
         ScalingDataSourceConfigurationWrap targetConfig = jobConfig.getRuleConfig().getTarget();
-        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), targetConfig.getParameter());
+        YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(targetConfig.getParameter(), YamlRootConfiguration.class);
+        WorkflowConfiguration workflowConfig = jobConfig.getHandleConfig().getWorkflowConfig();
+        String ruleCacheId = null != workflowConfig ? workflowConfig.getRuleCacheId() : null;
+        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), yamlRootConfig, ruleCacheId);
         ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
         optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> {
             each.setStatus(JobStatus.FINISHED);