You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/26 01:53:33 UTC
[shardingsphere] branch master updated: Improve scaling job start
process when rule cache changed (#13266)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0133432 Improve scaling job start process when rule cache changed (#13266)
0133432 is described below
commit 0133432127b272f0104c693de1175e33c1f5161e
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Oct 26 09:52:48 2021 +0800
Improve scaling job start process when rule cache changed (#13266)
* Fix ShardingSphereJDBCDataSourceConfiguration init exception
* Improve creating scaling job
* Unit test
---
.../subscriber/ScalingRegistrySubscriber.java | 9 ++++---
.../event/rule/ScalingTaskFinishedEvent.java | 2 ++
.../scaling/core/api/ScalingWorker.java | 22 ++++++++++++++--
.../scaling/core/config/RuleConfiguration.java | 29 ++++++++++++++++++++--
.../ShardingSphereJDBCDataSourceConfiguration.java | 10 ++++++--
.../YamlParameterConfiguration.java} | 22 +++++++++++-----
.../scaling/core/util/JDBCUtilTest.java | 2 +-
7 files changed, 79 insertions(+), 17 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index f77c7cf..73b1aca 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -88,10 +88,11 @@ public final class ScalingRegistrySubscriber {
*/
@Subscribe
public void cacheRuleConfiguration(final RuleConfigurationCachedEvent event) {
- StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(),
- repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName())),
- repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName())),
- registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId()), event.getCacheId());
+ String sourceDataSource = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
+ String sourceRule = repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
+ String targetRule = registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()), event.getCacheId());
+ String ruleCacheId = event.getCacheId();
+ StartScalingEvent startScalingEvent = new StartScalingEvent(event.getSchemaName(), sourceDataSource, sourceRule, targetRule, ruleCacheId);
ShardingSphereEventBus.getInstance().post(startScalingEvent);
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
index b2f1e13..f96cb63 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingTaskFinishedEvent.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule;
import lombok.Getter;
+import lombok.NonNull;
import lombok.RequiredArgsConstructor;
/**
@@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor;
@Getter
public final class ScalingTaskFinishedEvent {
+ @NonNull
private final String targetSchemaName;
private final String targetParameter;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
index 4f1efb9..086cfb8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingWorker.java
@@ -20,6 +20,9 @@ package org.apache.shardingsphere.scaling.core.api;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.SwitchRuleConfigurationEvent;
import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
@@ -30,6 +33,8 @@ import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJD
import org.apache.shardingsphere.scaling.core.executor.job.FinishedCheckJobExecutor;
import org.apache.shardingsphere.scaling.core.executor.job.ScalingJobExecutor;
+import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
/**
@@ -75,8 +80,21 @@ public final class ScalingWorker {
private RuleConfiguration getRuleConfiguration(final StartScalingEvent event) {
RuleConfiguration result = new RuleConfiguration();
- result.setSource(new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()).wrap());
- result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule()).wrap());
+ YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getSourceDataSource(), event.getSourceRule());
+ YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(), event.getTargetRule());
+ result.setSource(new ShardingSphereJDBCDataSourceConfiguration(sourceRootConfig).wrap());
+ result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(targetRootConfig).wrap());
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private YamlRootConfiguration getYamlRootConfiguration(final String schemaName, final String dataSources, final String rules) {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setSchemaName(schemaName);
+ Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(dataSources, Map.class);
+ result.setDataSources(yamlDataSources);
+ Collection<YamlRuleConfiguration> yamlRuleConfigs = YamlEngine.unmarshal(rules, Collection.class);
+ result.setRules(yamlRuleConfigs);
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
index 53218c9..93597ca 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
@@ -17,18 +17,43 @@
package org.apache.shardingsphere.scaling.core.config;
+import com.google.common.base.Preconditions;
import lombok.Getter;
-import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
/**
* Rule configuration.
*/
@Getter
-@Setter
public final class RuleConfiguration {
private ScalingDataSourceConfigurationWrap source;
private ScalingDataSourceConfigurationWrap target;
+
+ /**
+ * Set source.
+ *
+ * @param source source configuration
+ */
+ public void setSource(final ScalingDataSourceConfigurationWrap source) {
+ checkParameters(source);
+ this.source = source;
+ }
+
+ private void checkParameters(final ScalingDataSourceConfigurationWrap wrap) {
+ Preconditions.checkNotNull(wrap);
+ Preconditions.checkNotNull(wrap.getType());
+ Preconditions.checkNotNull(wrap.getParameter());
+ }
+
+ /**
+ * Set target.
+ *
+ * @param target target configuration
+ */
+ public void setTarget(final ScalingDataSourceConfigurationWrap target) {
+ checkParameters(target);
+ this.target = target;
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java
index c8ebd71..423d5c7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ShardingSphereJDBCDataSourceConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.scaling.core.config.yaml.ShardingRuleConfigurationSwapper;
+import org.apache.shardingsphere.scaling.core.config.yaml.YamlParameterConfiguration;
import org.apache.shardingsphere.scaling.core.util.JDBCUtil;
import javax.sql.DataSource;
@@ -58,13 +59,18 @@ public final class ShardingSphereJDBCDataSourceConfiguration implements ScalingD
databaseType = DatabaseTypeRegistry.getDatabaseTypeByURL(JDBCUtil.getJdbcUrl(props));
}
- public ShardingSphereJDBCDataSourceConfiguration(final String dataSources, final String rules) {
- this(String.format("%s\n%s", dataSources, rules));
+ public ShardingSphereJDBCDataSourceConfiguration(final YamlRootConfiguration rootConfig) {
+ YamlParameterConfiguration parameterConfig = new YamlParameterConfiguration(rootConfig.getDataSources(), rootConfig.getRules());
+ this.parameter = YamlEngine.marshal(parameterConfig);
+ this.rootConfig = rootConfig;
+ Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
+ databaseType = DatabaseTypeRegistry.getDatabaseTypeByURL(JDBCUtil.getJdbcUrl(props));
}
@Override
public ScalingDataSourceConfigurationWrap wrap() {
ScalingDataSourceConfigurationWrap result = new ScalingDataSourceConfigurationWrap();
+ result.setSchemaName(rootConfig.getSchemaName());
result.setType(TYPE);
result.setParameter(parameter);
return result;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/yaml/YamlParameterConfiguration.java
similarity index 56%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/yaml/YamlParameterConfiguration.java
index 53218c9..635a692 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RuleConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/yaml/YamlParameterConfiguration.java
@@ -15,20 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.config.yaml;
+import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
/**
- * Rule configuration.
+ * YAML parameter configuration.
*/
+@NoArgsConstructor
+@AllArgsConstructor
@Getter
@Setter
-public final class RuleConfiguration {
+public final class YamlParameterConfiguration implements YamlConfiguration {
- private ScalingDataSourceConfigurationWrap source;
+ private Map<String, Map<String, Object>> dataSources = new HashMap<>();
- private ScalingDataSourceConfigurationWrap target;
+ private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
index 742a42d..6d6d7be 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
@@ -43,7 +43,7 @@ public final class JDBCUtilTest {
@Test
public void assertAppendShardingSphereJDBCDataSourceConfig() {
- ShardingSphereJDBCDataSourceConfiguration dataSourceConfig = new ShardingSphereJDBCDataSourceConfiguration(getDataSourceYaml(), "");
+ ShardingSphereJDBCDataSourceConfiguration dataSourceConfig = new ShardingSphereJDBCDataSourceConfiguration(getDataSourceYaml());
JDBCUtil.appendJDBCParameter(dataSourceConfig, ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
List<DataSourceConfiguration> actual = new ArrayList<>(getDataSourceConfigurations(dataSourceConfig.getRootConfig().getDataSources()).values());
assertThat(actual.get(0).getProps().get("url"), is("jdbc:mysql://192.168.0.2:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));