You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/18 07:53:12 UTC
[shardingsphere] branch master updated: Use new version configuration instead of cache for scaling (#15492)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3020ed1 Use new version configuration instead of cache for scaling (#15492)
3020ed1 is described below
commit 3020ed156ad406a3959602c7217f1dfa49d16607
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Fri Feb 18 15:52:12 2022 +0800
Use new version configuration instead of cache for scaling (#15492)
---
.../service/SchemaVersionPersistService.java | 18 ++++++++++++
.../rdl/rule/RuleDefinitionBackendHandler.java | 33 ++++++++++++++--------
2 files changed, 39 insertions(+), 12 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
index eb99a0c..3f9e622 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Schema version persist service.
@@ -52,4 +53,21 @@ public final class SchemaVersionPersistService {
Optional<String> actualVersion = getSchemaActiveVersion(schemaName);
return actualVersion.isPresent() && actualVersion.get().equals(version);
}
+
+ /**
+ * Create new schema version.
+ *
+ * @param schemaName schema name
+ * @return new version
+ */
+ public Optional<String> createNewVersion(final String schemaName) {
+ Optional<String> activeVersion = getSchemaActiveVersion(schemaName);
+ if (activeVersion.isPresent()) {
+ String newVersion = String.valueOf(new AtomicLong(Long.valueOf(activeVersion.get())).incrementAndGet());
+ repository.persist(SchemaMetaDataNode.getRulePath(schemaName, newVersion), repository.get(SchemaMetaDataNode.getRulePath(schemaName, activeVersion.get())));
+ repository.persist(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, newVersion), repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, activeVersion.get())));
+ return Optional.of(newVersion);
+ }
+ return Optional.empty();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 365f051..a3cfc2a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionCreateUpdate
import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionDropUpdater;
import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionUpdater;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -83,7 +84,7 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
throw new RuntimeException("scaling is not enabled");
}
} else if (preprocessor.isPresent()) {
- processCache(shardingSphereMetaData, sqlStatement, (RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, preprocessor.get());
+ prepareScaling(shardingSphereMetaData, sqlStatement, (RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, preprocessor.get());
return new UpdateResponseHeader(sqlStatement);
}
processSQLStatement(shardingSphereMetaData, sqlStatement, ruleDefinitionUpdater, currentRuleConfig);
@@ -137,15 +138,16 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
}
}
- private void processCache(final ShardingSphereMetaData shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater updater, final RuleConfiguration currentRuleConfig,
+ private void prepareScaling(final ShardingSphereMetaData shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater updater, final RuleConfiguration currentRuleConfig,
final RuleDefinitionAlterPreprocessor preprocessor) {
- RuleConfiguration toBeAlteredRuleConfig = updater.buildToBeAlteredRuleConfiguration(sqlStatement);
- RuleConfiguration alteredRuleConfig = preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
- updater.updateCurrentRuleConfiguration(alteredRuleConfig, toBeAlteredRuleConfig);
- Collection<RuleConfiguration> alteredConfigs = new LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
- alteredConfigs.remove(currentRuleConfig);
- alteredConfigs.add(alteredRuleConfig);
- cacheRuleConfigurationChange(shardingSphereMetaData.getName(), alteredConfigs);
+ Optional<MetaDataPersistService> metaDataPersistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
+ if (metaDataPersistService.isPresent()) {
+ Optional<String> newVersion = metaDataPersistService.get().getSchemaVersionPersistService().createNewVersion(shardingSphereMetaData.getName());
+ if (!newVersion.isPresent()) {
+ throw new RuntimeException(String.format("Unable to get a new version for schema: %s", shardingSphereMetaData.getName()));
+ }
+ persistNewVersionConfiguration(shardingSphereMetaData, sqlStatement, updater, currentRuleConfig, preprocessor);
+ }
}
private void persistRuleConfigurationChange(final ShardingSphereMetaData shardingSphereMetaData) {
@@ -153,8 +155,15 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
shardingSphereMetaData.getName(), shardingSphereMetaData.getRuleMetaData().getConfigurations()));
}
- private void cacheRuleConfigurationChange(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations) {
- ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService().ifPresent(optional -> optional.getSchemaRuleService().cache(
- schemaName, ruleConfigurations));
+ private void persistNewVersionConfiguration(final ShardingSphereMetaData shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater updater,
+ final RuleConfiguration currentRuleConfig,
+ final RuleDefinitionAlterPreprocessor preprocessor) {
+ RuleConfiguration toBeAlteredRuleConfig = updater.buildToBeAlteredRuleConfiguration(sqlStatement);
+ RuleConfiguration alteredRuleConfig = preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
+ updater.updateCurrentRuleConfiguration(alteredRuleConfig, toBeAlteredRuleConfig);
+ Collection<RuleConfiguration> alteredConfigs = new LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
+ alteredConfigs.remove(currentRuleConfig);
+ alteredConfigs.add(alteredRuleConfig);
+ // TODO persist altered configs to new schema version
}
}