You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/27 11:15:14 UTC
[shardingsphere] branch master updated: Refactor of scaling switch
cluster configuration (#13315)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 73846fd Refactor of scaling switch cluster configuration (#13315)
73846fd is described below
commit 73846fd843cf695285b7951560825786a72d70f2
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Oct 27 19:14:33 2021 +0800
Refactor of scaling switch cluster configuration (#13315)
* Just switch cluster configuration when rule cache changed and no scaling job to start
* Use rules snapshot to switch cluster configuration
* Minor refactor
* Keep delete rule cache when scaling job finished
---
.../subscriber/ScalingRegistrySubscriber.java | 34 ++++++++--------------
.../event/rule/ScalingTaskFinishedEvent.java | 10 ++++++-
.../event/rule/SwitchRuleConfigurationEvent.java | 33 ---------------------
.../subscriber/ScalingRegistrySubscriberTest.java | 9 ------
.../proxy/initializer/BootstrapInitializer.java | 1 -
.../scaling/core/api/ScalingWorker.java | 30 ++++++++++---------
.../scaling/core/api/impl/ScalingAPIImpl.java | 6 +++-
7 files changed, 43 insertions(+), 80 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index 73b1aca..ddf2c48 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber;
import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
@@ -26,14 +27,12 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ClusterSwitchConfigurationEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationCachedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.SwitchRuleConfigurationEvent;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
import org.apache.shardingsphere.mode.metadata.persist.service.impl.SchemaRulePersistService;
import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import java.util.Collection;
@@ -45,6 +44,7 @@ import java.util.stream.Collectors;
/**
* Scaling registry subscriber.
*/
+@Slf4j
// TODO move to scaling module
public final class ScalingRegistrySubscriber {
@@ -65,29 +65,12 @@ public final class ScalingRegistrySubscriber {
}
/**
- * Switch rule configuration.
- *
- * @param event switch rule configuration event
- */
- @Subscribe
- public void switchRuleConfiguration(final SwitchRuleConfigurationEvent event) {
- persistService.persist(event.getSchemaName(), loadCachedRuleConfigurations(event.getSchemaName(), event.getRuleConfigurationCacheId()));
- registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getRuleConfigurationCacheId());
- }
-
- @SuppressWarnings("unchecked")
- private Collection<RuleConfiguration> loadCachedRuleConfigurations(final String schemaName, final String ruleConfigCacheId) {
- return new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(
- YamlEngine.unmarshal(registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(schemaName), ruleConfigCacheId), Collection.class));
- }
-
- /**
- * Cache rule configuration.
+ * Rule configuration cached.
*
* @param event rule configuration cached event
*/
@Subscribe
- public void cacheRuleConfiguration(final RuleConfigurationCachedEvent event) {
+ public void ruleConfigurationCached(final RuleConfigurationCachedEvent event) {
String sourceDataSource = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
String sourceRule = repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
String targetRule = registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId());
@@ -103,12 +86,18 @@ public final class ScalingRegistrySubscriber {
*/
@Subscribe
public void scalingTaskFinished(final ScalingTaskFinishedEvent event) {
- YamlRootConfiguration yamlRootConfiguration = YamlEngine.unmarshal(event.getTargetParameter(), YamlRootConfiguration.class);
+ log.info("scalingTaskFinished, event={}", event);
+ YamlRootConfiguration yamlRootConfiguration = event.getTargetRootConfig();
Map<String, DataSourceConfiguration> dataSourceConfigs = yamlRootConfiguration.getDataSources().entrySet().stream().collect(Collectors.toMap(
Entry::getKey, entry -> new YamlDataSourceConfigurationSwapper().swapToDataSourceConfiguration(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfiguration.getRules());
ClusterSwitchConfigurationEvent switchEvent = new ClusterSwitchConfigurationEvent(event.getTargetSchemaName(), dataSourceConfigs, ruleConfigs);
ShardingSphereEventBus.getInstance().post(switchEvent);
+ String ruleCacheId = event.getRuleCacheId();
+ if (null != ruleCacheId) {
+ log.info("start to delete cache, ruleCacheId={}", ruleCacheId);
+ registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getTargetSchemaName()), ruleCacheId);
+ }
}
/**
@@ -119,6 +108,7 @@ public final class ScalingRegistrySubscriber {
@Subscribe
public void clusterSwitchConfiguration(final ClusterSwitchConfigurationEvent event) {
String schemaName = event.getTargetSchemaName();
+ log.info("clusterSwitchConfiguration, schemaName={}", schemaName);
dataSourcePersistService.persist(schemaName, event.getTargetDataSourceConfigs());
persistService.persist(schemaName, event.getTargetRuleConfigs());
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
index f96cb63..1a075e5 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.conf
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
/**
* Scaling task finished event.
@@ -31,5 +32,12 @@ public final class ScalingTaskFinishedEvent {
@NonNull
private final String targetSchemaName;
- private final String targetParameter;
+ private final YamlRootConfiguration targetRootConfig;
+
+ private final String ruleCacheId;
+
+ @Override
+ public String toString() {
+ return "ScalingTaskFinishedEvent{" + "targetSchemaName='" + targetSchemaName + '\'' + ", ruleCacheId='" + ruleCacheId + '\'' + '}';
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/SwitchRuleConfigurationEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/SwitchRuleConfigurationEvent.java
deleted file mode 100644
index 3c651de..0000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/SwitchRuleConfigurationEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Switch rule configuration event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class SwitchRuleConfigurationEvent {
-
- private final String schemaName;
-
- private final String ruleConfigurationCacheId;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java
index 01e0322..3e184f9 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriberTest.java
@@ -54,15 +54,6 @@ public final class ScalingRegistrySubscriberTest {
}
@Test
- public void assertSwitchRuleConfiguration() throws ReflectiveOperationException {
- // Move to scaling module
-// when(registryCacheManager.loadCache(anyString(), eq("testCacheId"))).thenReturn(readYAML());
-// SwitchRuleConfigurationEvent event = new SwitchRuleConfigurationEvent("sharding_db", "testCacheId");
-// scalingRegistrySubscriber.switchRuleConfiguration(event);
- // TODO finish verify
- }
-
- @Test
public void assertCacheRuleConfiguration() {
// TODO finish test case
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index d8894a9..e0f6b97 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -129,7 +129,6 @@ public final class BootstrapInitializer {
if (null == yamlConfig.getServerConfiguration().getScaling()) {
return Optional.empty();
}
-
ServerConfiguration result = new ServerConfiguration();
result.setBlockQueueSize(yamlConfig.getServerConfiguration().getScaling().getBlockQueueSize());
result.setWorkerThread(yamlConfig.getServerConfiguration().getScaling().getWorkerThread());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
index 987357c..5cea979 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
@@ -72,24 +72,32 @@ public final class ScalingWorker {
@Subscribe
public void start(final StartScalingEvent event) {
log.info("Start scaling job by {}", event);
+ Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
+ Optional<Long> jobId = jobConfigOptional.isPresent() ? scalingAPI.start(jobConfigOptional.get()) : Optional.empty();
+ if (!jobId.isPresent()) {
+ log.info("Switch rule configuration immediately.");
+ YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
+ ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getSchemaName(), targetRootConfig, event.getRuleCacheId());
+ ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
+ }
+ }
+
+ private Optional<JobConfiguration> createJobConfig(final StartScalingEvent event) {
YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
Optional<YamlShardingRuleConfiguration> sourceShardingConfigOptional = getYamlShardingRuleConfiguration(sourceRootConfig);
Optional<YamlShardingRuleConfiguration> targetShardingConfigOptional = getYamlShardingRuleConfiguration(targetRootConfig);
if (!sourceShardingConfigOptional.isPresent() || !targetShardingConfigOptional.isPresent()) {
log.info("sourceShardingConfig or targetShardingConfig not present, ignore");
- return;
+ return Optional.empty();
}
if (isShardingRulesTheSame(sourceShardingConfigOptional.get(), targetShardingConfigOptional.get())) {
log.info("source and target sharding configuration is the same, ignore");
- return;
- }
- JobConfiguration jobConfig = new JobConfiguration(getRuleConfiguration(sourceRootConfig, targetRootConfig), getHandleConfiguration(event));
- Optional<Long> jobId = scalingAPI.start(jobConfig);
- if (!jobId.isPresent()) {
- log.info("Switch rule configuration ruleCacheId = {} immediately.", event.getRuleCacheId());
- ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
+ return Optional.empty();
}
+ RuleConfiguration ruleConfig = getRuleConfiguration(sourceRootConfig, targetRootConfig);
+ HandleConfiguration handleConfig = new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+ return Optional.of(new JobConfiguration(ruleConfig, handleConfig));
}
private Optional<YamlShardingRuleConfiguration> getYamlShardingRuleConfiguration(final YamlRootConfiguration rootConfig) {
@@ -115,10 +123,6 @@ public final class ScalingWorker {
return result;
}
- private HandleConfiguration getHandleConfiguration(final StartScalingEvent event) {
- return new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
- }
-
@SuppressWarnings("unchecked")
private YamlRootConfiguration getYamlRootConfiguration(final String schemaName, final String dataSources, final String rules) {
YamlRootConfiguration result = new YamlRootConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index aa1b619..b5c6aec 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.config.TypedSPIConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.api.DataConsistencyCheckAlgorithmInfo;
@@ -258,7 +259,10 @@ public final class ScalingAPIImpl implements ScalingAPI {
Optional<Collection<JobContext>> optionalJobContexts = JobSchedulerCenter.getJobContexts(jobId);
optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> each.setStatus(JobStatus.ALMOST_FINISHED)));
ScalingDataSourceConfigurationWrap targetConfig = jobConfig.getRuleConfig().getTarget();
- ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), targetConfig.getParameter());
+ YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(targetConfig.getParameter(), YamlRootConfiguration.class);
+ WorkflowConfiguration workflowConfig = jobConfig.getHandleConfig().getWorkflowConfig();
+ String ruleCacheId = null != workflowConfig ? workflowConfig.getRuleCacheId() : null;
+ ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(targetConfig.getSchemaName(), yamlRootConfig, ruleCacheId);
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each -> {
each.setStatus(JobStatus.FINISHED);