You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/02/18 12:22:09 UTC
[shardingsphere] branch master updated: Add active version changed listener (#15504)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 36d859e Add active version changed listener (#15504)
36d859e is described below
commit 36d859e73b30d915f1c487f8f624cc1b73b745da
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Fri Feb 18 20:21:14 2022 +0800
Add active version changed listener (#15504)
---
.../mode/manager/ContextManager.java | 47 ++++++++++++++++++----
.../persist/service/SchemaBasedPersistService.java | 9 +++++
.../service/impl/DataSourcePersistService.java | 6 +++
.../service/impl/SchemaRulePersistService.java | 7 ++++
.../ClusterContextManagerCoordinator.java | 16 ++++++++
.../event/version/SchemaVersionChangedEvent.java | 34 ++++++++++++++++
.../metadata/watcher/MetaDataChangedWatcher.java | 4 ++
7 files changed, 116 insertions(+), 7 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 482ce4a..4807cd9 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -275,19 +275,29 @@ public final class ContextManager implements AutoCloseable {
public void alterDataSourceConfiguration(final String schemaName, final Map<String, DataSourceProperties> dataSourcePropsMap) {
try {
MetaDataContexts changedMetaDataContext = buildChangedMetaDataContextWithChangedDataSource(metaDataContexts.getMetaDataMap().get(schemaName), dataSourcePropsMap);
- metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
- Map<String, ShardingSphereMetaData> metaDataMap = new HashMap<>(metaDataContexts.getMetaDataMap());
- metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
- Collection<DataSource> pendingClosedDataSources = getPendingClosedDataSources(schemaName, dataSourcePropsMap);
- renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
- renewTransactionContext(schemaName, metaDataContexts.getMetaData(schemaName).getResource());
- closeDataSources(schemaName, pendingClosedDataSources);
+ refreshMetaDataContext(schemaName, changedMetaDataContext, dataSourcePropsMap);
} catch (final SQLException ex) {
log.error("Alter schema:{} data source configuration failed", schemaName, ex);
}
}
/**
+ * Alter data source and rule configuration.
+ *
+ * @param schemaName schema name
+ * @param dataSourcePropsMap data source props map
+ * @param ruleConfigs rule configurations
+ */
+ public void alterDataSourceAndRuleConfiguration(final String schemaName, final Map<String, DataSourceProperties> dataSourcePropsMap, final Collection<RuleConfiguration> ruleConfigs) {
+ try {
+ MetaDataContexts changedMetaDataContext = buildChangedMetaDataContextWithChangedDataSourceAndRule(metaDataContexts.getMetaDataMap().get(schemaName), dataSourcePropsMap, ruleConfigs);
+ refreshMetaDataContext(schemaName, changedMetaDataContext, dataSourcePropsMap);
+ } catch (SQLException ex) {
+ log.error("Alter schema:{} data source and rule configuration failed", schemaName, ex);
+ }
+ }
+
+ /**
* Alter global rule configuration.
*
* @param ruleConfigurations global rule configuration
@@ -428,6 +438,16 @@ public final class ContextManager implements AutoCloseable {
renewTransactionContext(schemaName, metaDataContexts.getMetaData(schemaName).getResource());
}
+ private void refreshMetaDataContext(final String schemaName, final MetaDataContexts changedMetaDataContext, final Map<String, DataSourceProperties> dataSourcePropsMap) {
+ metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
+ Map<String, ShardingSphereMetaData> metaDataMap = new HashMap<>(metaDataContexts.getMetaDataMap());
+ metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
+ Collection<DataSource> pendingClosedDataSources = getPendingClosedDataSources(schemaName, dataSourcePropsMap);
+ renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
+ renewTransactionContext(schemaName, metaDataContexts.getMetaData(schemaName).getResource());
+ closeDataSources(schemaName, pendingClosedDataSources);
+ }
+
private MetaDataContexts buildChangedMetaDataContextWithAddedDataSource(final ShardingSphereMetaData originalMetaData,
final Map<String, DataSourceProperties> addedDataSourceProps) throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(originalMetaData.getResource().getDataSources());
@@ -463,6 +483,19 @@ public final class ContextManager implements AutoCloseable {
return metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
}
+ private MetaDataContexts buildChangedMetaDataContextWithChangedDataSourceAndRule(final ShardingSphereMetaData originalMetaData, final Map<String, DataSourceProperties> newDataSourceProps,
+ final Collection<RuleConfiguration> ruleConfigs) throws SQLException {
+ Collection<String> deletedDataSources = getDeletedDataSources(originalMetaData, newDataSourceProps).keySet();
+ Map<String, DataSource> changedDataSources = buildChangedDataSources(originalMetaData, newDataSourceProps);
+ Properties props = metaDataContexts.getProps().getProps();
+ MetaDataContextsBuilder metaDataContextsBuilder = new MetaDataContextsBuilder(metaDataContexts.getGlobalRuleMetaData().getConfigurations(), props);
+ metaDataContextsBuilder.addSchema(originalMetaData.getName(), new DataSourceProvidedSchemaConfiguration(getNewDataSources(originalMetaData.getResource().getDataSources(),
+ getAddedDataSources(originalMetaData, newDataSourceProps), changedDataSources, deletedDataSources), ruleConfigs), props);
+ metaDataContexts.getMetaDataPersistService().ifPresent(
+ optional -> optional.getSchemaMetaDataService().persist(originalMetaData.getName(), metaDataContextsBuilder.getSchemaMap().get(originalMetaData.getName())));
+ return metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
+ }
+
private Map<String, DataSource> getNewDataSources(final Map<String, DataSource> originalDataSources,
final Map<String, DataSource> addedDataSources, final Map<String, DataSource> changedDataSources, final Collection<String> deletedDataSources) {
Map<String, DataSource> result = new LinkedHashMap<>(originalDataSources);
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
index 9fc8617..cb9e852 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
@@ -59,6 +59,15 @@ public interface SchemaBasedPersistService<T> {
T load(String schemaName);
/**
+ * Load configurations based version.
+ *
+ * @param schemaName schema name
+ * @param version version
+ * @return configurations
+ */
+ T load(String schemaName, String version);
+
+ /**
* Judge whether schema configuration existed.
*
* @param schemaName schema name
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
index 7d229c1..0e191db 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
@@ -72,6 +72,12 @@ public final class DataSourcePersistService implements SchemaBasedPersistService
return isExisted(schemaName) ? getDataSourceProperties(repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, getSchemaActiveVersion(schemaName)))) : new LinkedHashMap<>();
}
+ @Override
+ public Map<String, DataSourceProperties> load(final String schemaName, final String version) {
+ String yamlContent = repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, version));
+ return Strings.isNullOrEmpty(yamlContent) ? new LinkedHashMap<>() : getDataSourceProperties(yamlContent);
+ }
+
@SuppressWarnings("unchecked")
private Map<String, DataSourceProperties> getDataSourceProperties(final String yamlContent) {
Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(yamlContent, Map.class);
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
index ae862c6..e8ae771 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
@@ -75,6 +75,13 @@ public final class SchemaRulePersistService implements SchemaBasedPersistService
}
@Override
+ public Collection<RuleConfiguration> load(final String schemaName, final String version) {
+ String yamlContent = repository.get(SchemaMetaDataNode.getRulePath(schemaName, version));
+ return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() : new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(SchemaMetaDataNode
+ .getRulePath(schemaName, getSchemaActiveVersion(schemaName))), Collection.class, true));
+ }
+
+ @Override
public boolean isExisted(final String schemaName) {
return !Strings.isNullOrEmpty(getSchemaActiveVersion(schemaName)) && !Strings.isNullOrEmpty(repository.get(SchemaMetaDataNode.getRulePath(schemaName, getSchemaActiveVersion(schemaName))));
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index e7f4b83..c403d1b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
@@ -32,6 +34,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
@@ -46,6 +49,7 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Optional;
/**
@@ -223,6 +227,18 @@ public final class ClusterContextManagerCoordinator {
}
}
+ /**
+ * Renew with new schema version.
+ *
+ * @param event schema version changed event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaVersionChangedEvent event) {
+ Map<String, DataSourceProperties> dataSourcePropertiesMap = metaDataPersistService.getDataSourceService().load(event.getSchemaName(), event.getActiveVersion());
+ Collection<RuleConfiguration> ruleConfigs = metaDataPersistService.getSchemaRuleService().load(event.getSchemaName(), event.getActiveVersion());
+ contextManager.alterDataSourceAndRuleConfiguration(event.getSchemaName(), dataSourcePropertiesMap, ruleConfigs);
+ }
+
private void persistSchema(final String schemaName) {
if (!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
metaDataPersistService.getDataSourceService().persist(schemaName, new LinkedHashMap<>());
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
new file mode 100644
index 0000000..cb728ac
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Schema version changed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class SchemaVersionChangedEvent implements GovernanceEvent {
+
+ private final String schemaName;
+
+ private final String activeVersion;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
index e06993f..5580d74 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
@@ -99,6 +100,9 @@ public final class MetaDataChangedWatcher implements GovernanceWatcher<Governanc
if (!schemaName.isPresent() || Strings.isNullOrEmpty(event.getValue())) {
return Optional.empty();
}
+ if (event.getKey().equals(SchemaMetaDataNode.getActiveVersionPath(schemaName.get()))) {
+ return Optional.of(new SchemaVersionChangedEvent(schemaName.get(), event.getValue()));
+ }
Optional<String> schemaVersion = SchemaMetaDataNode.getVersionByDataSourcesPath(event.getKey());
if (schemaVersion.isPresent()) {
return Optional.of(createDataSourceChangedEvent(schemaName.get(), schemaVersion.get(), event));