You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/02/18 12:22:09 UTC

[shardingsphere] branch master updated: Add active version changed listener (#15504)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d859e  Add active version changed listener (#15504)
36d859e is described below

commit 36d859e73b30d915f1c487f8f624cc1b73b745da
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Fri Feb 18 20:21:14 2022 +0800

    Add active version changed listener (#15504)
---
 .../mode/manager/ContextManager.java               | 47 ++++++++++++++++++----
 .../persist/service/SchemaBasedPersistService.java |  9 +++++
 .../service/impl/DataSourcePersistService.java     |  6 +++
 .../service/impl/SchemaRulePersistService.java     |  7 ++++
 .../ClusterContextManagerCoordinator.java          | 16 ++++++++
 .../event/version/SchemaVersionChangedEvent.java   | 34 ++++++++++++++++
 .../metadata/watcher/MetaDataChangedWatcher.java   |  4 ++
 7 files changed, 116 insertions(+), 7 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 482ce4a..4807cd9 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -275,19 +275,29 @@ public final class ContextManager implements AutoCloseable {
     public void alterDataSourceConfiguration(final String schemaName, final Map<String, DataSourceProperties> dataSourcePropsMap) {
         try {
             MetaDataContexts changedMetaDataContext = buildChangedMetaDataContextWithChangedDataSource(metaDataContexts.getMetaDataMap().get(schemaName), dataSourcePropsMap);
-            metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
-            Map<String, ShardingSphereMetaData> metaDataMap = new HashMap<>(metaDataContexts.getMetaDataMap());
-            metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
-            Collection<DataSource> pendingClosedDataSources = getPendingClosedDataSources(schemaName, dataSourcePropsMap);
-            renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
-            renewTransactionContext(schemaName, metaDataContexts.getMetaData(schemaName).getResource());
-            closeDataSources(schemaName, pendingClosedDataSources);
+            refreshMetaDataContext(schemaName, changedMetaDataContext, dataSourcePropsMap);
         } catch (final SQLException ex) {
             log.error("Alter schema:{} data source configuration failed", schemaName, ex);
         }
     }
     
     /**
+     * Alter data source and rule configuration.
+     * 
+     * @param schemaName schema name
+     * @param dataSourcePropsMap data source props map
+     * @param ruleConfigs rule configurations
+     */
+    public void alterDataSourceAndRuleConfiguration(final String schemaName, final Map<String, DataSourceProperties> dataSourcePropsMap, final Collection<RuleConfiguration> ruleConfigs) {
+        try {
+            MetaDataContexts changedMetaDataContext = buildChangedMetaDataContextWithChangedDataSourceAndRule(metaDataContexts.getMetaDataMap().get(schemaName), dataSourcePropsMap, ruleConfigs);
+            refreshMetaDataContext(schemaName, changedMetaDataContext, dataSourcePropsMap);
+        } catch (SQLException ex) {
+            log.error("Alter schema:{} data source and rule configuration failed", schemaName, ex);
+        }
+    }
+    
+    /**
      * Alter global rule configuration.
      * 
      * @param ruleConfigurations global rule configuration
@@ -428,6 +438,16 @@ public final class ContextManager implements AutoCloseable {
         renewTransactionContext(schemaName, metaDataContexts.getMetaData(schemaName).getResource());
     }
     
+    private void refreshMetaDataContext(final String schemaName, final MetaDataContexts changedMetaDataContext, final Map<String, DataSourceProperties> dataSourcePropsMap) {
+        metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
+        Map<String, ShardingSphereMetaData> metaDataMap = new HashMap<>(metaDataContexts.getMetaDataMap());
+        metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
+        Collection<DataSource> pendingClosedDataSources = getPendingClosedDataSources(schemaName, dataSourcePropsMap);
+        renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
+        renewTransactionContext(schemaName, metaDataContexts.getMetaData(schemaName).getResource());
+        closeDataSources(schemaName, pendingClosedDataSources);
+    }
+    
     private MetaDataContexts buildChangedMetaDataContextWithAddedDataSource(final ShardingSphereMetaData originalMetaData, 
                                                                             final Map<String, DataSourceProperties> addedDataSourceProps) throws SQLException {
         Map<String, DataSource> dataSourceMap = new HashMap<>(originalMetaData.getResource().getDataSources());
@@ -463,6 +483,19 @@ public final class ContextManager implements AutoCloseable {
         return metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
     }
     
+    private MetaDataContexts buildChangedMetaDataContextWithChangedDataSourceAndRule(final ShardingSphereMetaData originalMetaData, final Map<String, DataSourceProperties> newDataSourceProps, 
+                                                                                     final Collection<RuleConfiguration> ruleConfigs) throws SQLException {
+        Collection<String> deletedDataSources = getDeletedDataSources(originalMetaData, newDataSourceProps).keySet();
+        Map<String, DataSource> changedDataSources = buildChangedDataSources(originalMetaData, newDataSourceProps);
+        Properties props = metaDataContexts.getProps().getProps();
+        MetaDataContextsBuilder metaDataContextsBuilder = new MetaDataContextsBuilder(metaDataContexts.getGlobalRuleMetaData().getConfigurations(), props);
+        metaDataContextsBuilder.addSchema(originalMetaData.getName(), new DataSourceProvidedSchemaConfiguration(getNewDataSources(originalMetaData.getResource().getDataSources(), 
+                getAddedDataSources(originalMetaData, newDataSourceProps), changedDataSources, deletedDataSources), ruleConfigs), props);
+        metaDataContexts.getMetaDataPersistService().ifPresent(
+            optional -> optional.getSchemaMetaDataService().persist(originalMetaData.getName(), metaDataContextsBuilder.getSchemaMap().get(originalMetaData.getName())));
+        return metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
+    }
+    
     private Map<String, DataSource> getNewDataSources(final Map<String, DataSource> originalDataSources,
                                                       final Map<String, DataSource> addedDataSources, final Map<String, DataSource> changedDataSources, final Collection<String> deletedDataSources) {
         Map<String, DataSource> result = new LinkedHashMap<>(originalDataSources);
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
index 9fc8617..cb9e852 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
@@ -59,6 +59,15 @@ public interface SchemaBasedPersistService<T> {
     T load(String schemaName);
     
     /**
+     * Load configurations based version.
+     * 
+     * @param schemaName schema name
+     * @param version version
+     * @return configurations
+     */
+    T load(String schemaName, String version);
+    
+    /**
      * Judge whether schema configuration existed.
      *
      * @param schemaName schema name
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
index 7d229c1..0e191db 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
@@ -72,6 +72,12 @@ public final class DataSourcePersistService implements SchemaBasedPersistService
         return isExisted(schemaName) ? getDataSourceProperties(repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, getSchemaActiveVersion(schemaName)))) : new LinkedHashMap<>();
     }
     
+    @Override
+    public Map<String, DataSourceProperties> load(final String schemaName, final String version) {
+        String yamlContent = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, version));
+        return Strings.isNullOrEmpty(yamlContent) ? new LinkedHashMap<>() : getDataSourceProperties(yamlContent);
+    }
+    
     @SuppressWarnings("unchecked")
     private Map<String, DataSourceProperties> getDataSourceProperties(final String yamlContent) {
         Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(yamlContent, Map.class);
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
index ae862c6..e8ae771 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
@@ -75,6 +75,13 @@ public final class SchemaRulePersistService implements SchemaBasedPersistService
     }
     
     @Override
+    public Collection<RuleConfiguration> load(final String schemaName, final String version) {
+        String yamlContent = repository.get(SchemaMetaDataNode.getRulePath(schemaName, version));
+        return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() : new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(SchemaMetaDataNode
+                .getRulePath(schemaName, getSchemaActiveVersion(schemaName))), Collection.class, true));
+    }
+    
+    @Override
     public boolean isExisted(final String schemaName) {
         return !Strings.isNullOrEmpty(getSchemaActiveVersion(schemaName)) && !Strings.isNullOrEmpty(repository.get(SchemaMetaDataNode.getRulePath(schemaName, getSchemaActiveVersion(schemaName))));
     }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index e7f4b83..c403d1b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
@@ -32,6 +34,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
@@ -46,6 +49,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -223,6 +227,18 @@ public final class ClusterContextManagerCoordinator {
         }
     }
     
+    /**
+     * Renew with new schema version.
+     * 
+     * @param event schema version changed event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaVersionChangedEvent event) {
+        Map<String, DataSourceProperties> dataSourcePropertiesMap = metaDataPersistService.getDataSourceService().load(event.getSchemaName(), event.getActiveVersion());
+        Collection<RuleConfiguration> ruleConfigs = metaDataPersistService.getSchemaRuleService().load(event.getSchemaName(), event.getActiveVersion());
+        contextManager.alterDataSourceAndRuleConfiguration(event.getSchemaName(), dataSourcePropertiesMap, ruleConfigs);
+    }
+    
     private void persistSchema(final String schemaName) {
         if (!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
             metaDataPersistService.getDataSourceService().persist(schemaName, new LinkedHashMap<>());
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
new file mode 100644
index 0000000..cb728ac
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Schema version changed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class SchemaVersionChangedEvent implements GovernanceEvent {
+    
+    private final String schemaName;
+    
+    private final String activeVersion;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
index e06993f..5580d74 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
@@ -99,6 +100,9 @@ public final class MetaDataChangedWatcher implements GovernanceWatcher<Governanc
         if (!schemaName.isPresent() || Strings.isNullOrEmpty(event.getValue())) {
             return Optional.empty();
         }
+        if (event.getKey().equals(SchemaMetaDataNode.getActiveVersionPath(schemaName.get()))) {
+            return Optional.of(new SchemaVersionChangedEvent(schemaName.get(), event.getValue()));
+        }
         Optional<String> schemaVersion = SchemaMetaDataNode.getVersionByDataSourcesPath(event.getKey());
         if (schemaVersion.isPresent()) {
             return Optional.of(createDataSourceChangedEvent(schemaName.get(), schemaVersion.get(), event));