You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/18 07:53:12 UTC

[shardingsphere] branch master updated: Use new version configuration instead of cache for scaling (#15492)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3020ed1  Use new version configuration instead of cache for scaling (#15492)
3020ed1 is described below

commit 3020ed156ad406a3959602c7217f1dfa49d16607
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Fri Feb 18 15:52:12 2022 +0800

    Use new version configuration instead of cache for scaling (#15492)
---
 .../service/SchemaVersionPersistService.java       | 18 ++++++++++++
 .../rdl/rule/RuleDefinitionBackendHandler.java     | 33 ++++++++++++++--------
 2 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
index eb99a0c..3f9e622 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Schema version persist service.
@@ -52,4 +53,21 @@ public final class SchemaVersionPersistService {
         Optional<String> actualVersion = getSchemaActiveVersion(schemaName);
         return actualVersion.isPresent() && actualVersion.get().equals(version);
     }
+    
+    /**
+     * Create new schema version.
+     * 
+     * @param schemaName schema name
+     * @return new version
+     */
+    public Optional<String> createNewVersion(final String schemaName) {
+        Optional<String> activeVersion = getSchemaActiveVersion(schemaName);
+        if (activeVersion.isPresent()) {
+            String newVersion = String.valueOf(new AtomicLong(Long.valueOf(activeVersion.get())).incrementAndGet());
+            repository.persist(SchemaMetaDataNode.getRulePath(schemaName, newVersion), repository.get(SchemaMetaDataNode.getRulePath(schemaName, activeVersion.get())));
+            repository.persist(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, newVersion), repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, activeVersion.get())));
+            return Optional.of(newVersion);
+        }
+        return Optional.empty();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 365f051..a3cfc2a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionCreateUpdate
 import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionDropUpdater;
 import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionUpdater;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -83,7 +84,7 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
                 throw new RuntimeException("scaling is not enabled");
             }
         } else if (preprocessor.isPresent()) {
-            processCache(shardingSphereMetaData, sqlStatement, (RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, preprocessor.get());
+            prepareScaling(shardingSphereMetaData, sqlStatement, (RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, preprocessor.get());
             return new UpdateResponseHeader(sqlStatement);
         }
         processSQLStatement(shardingSphereMetaData, sqlStatement, ruleDefinitionUpdater, currentRuleConfig);
@@ -137,15 +138,16 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
         }
     }
     
-    private void processCache(final ShardingSphereMetaData shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater updater, final RuleConfiguration currentRuleConfig,
+    private void prepareScaling(final ShardingSphereMetaData shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater updater, final RuleConfiguration currentRuleConfig,
                               final RuleDefinitionAlterPreprocessor preprocessor) {
-        RuleConfiguration toBeAlteredRuleConfig = updater.buildToBeAlteredRuleConfiguration(sqlStatement);
-        RuleConfiguration alteredRuleConfig = preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
-        updater.updateCurrentRuleConfiguration(alteredRuleConfig, toBeAlteredRuleConfig);
-        Collection<RuleConfiguration> alteredConfigs = new LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
-        alteredConfigs.remove(currentRuleConfig);
-        alteredConfigs.add(alteredRuleConfig);
-        cacheRuleConfigurationChange(shardingSphereMetaData.getName(), alteredConfigs);
+        Optional<MetaDataPersistService> metaDataPersistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
+        if (metaDataPersistService.isPresent()) {
+            Optional<String> newVersion = metaDataPersistService.get().getSchemaVersionPersistService().createNewVersion(shardingSphereMetaData.getName());
+            if (!newVersion.isPresent()) {
+                throw new RuntimeException(String.format("Unable to get a new version for schema: %s", shardingSphereMetaData.getName()));
+            }
+            persistNewVersionConfiguration(shardingSphereMetaData, sqlStatement, updater, currentRuleConfig, preprocessor);
+        }
     }
     
     private void persistRuleConfigurationChange(final ShardingSphereMetaData shardingSphereMetaData) {
@@ -153,8 +155,15 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
                 shardingSphereMetaData.getName(), shardingSphereMetaData.getRuleMetaData().getConfigurations()));
     }
     
-    private void cacheRuleConfigurationChange(final String schemaName, final Collection<RuleConfiguration> ruleConfigurations) {
-        ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService().ifPresent(optional -> optional.getSchemaRuleService().cache(
-                schemaName, ruleConfigurations));
+    private void persistNewVersionConfiguration(final ShardingSphereMetaData shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater updater, 
+                                                final RuleConfiguration currentRuleConfig, 
+                                                final RuleDefinitionAlterPreprocessor preprocessor) {
+        RuleConfiguration toBeAlteredRuleConfig = updater.buildToBeAlteredRuleConfiguration(sqlStatement);
+        RuleConfiguration alteredRuleConfig = preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
+        updater.updateCurrentRuleConfiguration(alteredRuleConfig, toBeAlteredRuleConfig);
+        Collection<RuleConfiguration> alteredConfigs = new LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
+        alteredConfigs.remove(currentRuleConfig);
+        alteredConfigs.add(alteredRuleConfig);
+        // TODO persist altered configs to new schema version
     }
 }