You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ji...@apache.org on 2022/01/12 05:31:03 UTC
[shardingsphere] branch master updated: [DistSQL] Add check for resource in set readwrite splitting read resource syntax (#14685)
This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0967b84 [DistSQL] Add check for resource in set readwrite splitting read resource syntax (#14685)
0967b84 is described below
commit 0967b84394c876a196c75eb3e2f05c6c6f6b4e11
Author: lanchengx <52...@users.noreply.github.com>
AuthorDate: Wed Jan 12 13:30:05 2022 +0800
[DistSQL] Add check for resource in set readwrite splitting read resource syntax (#14685)
* Add check for resource in `set readwrite splitting read resource` syntax
* Fix the `show readwrite splitting rules` syntax to display static configuration, the disabled database is not removed
* Adjust collection type.
* Modify exception information.
* Modify exception information.
* Modify exception information.
---
.../rule/ReadwriteSplittingDataSourceRule.java | 24 +++-
.../rule/ReadwriteSplittingRule.java | 24 +++-
.../ReadwriteSplittingRuleQueryResultSet.java | 30 +++--
.../distsql/constant/ExportableConstants.java | 2 +
.../SetReadwriteSplittingStatusExecutor.java | 129 ++++++++++++++++++++-
5 files changed, 186 insertions(+), 23 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java
index 0588e16..ecba230 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java
@@ -28,9 +28,9 @@ import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingD
import org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -117,7 +117,7 @@ public final class ReadwriteSplittingDataSourceRule {
result.put(name, actualDataSourceNames);
return result;
}
-
+
/**
* Get auto aware data sources.
*
@@ -125,12 +125,28 @@ public final class ReadwriteSplittingDataSourceRule {
*/
public Map<String, String> getAutoAwareDataSources() {
Optional<DataSourceNameAware> dataSourceNameAware = DataSourceNameAwareFactory.getInstance().getDataSourceNameAware();
+ Map<String, String> result = new HashMap<>(2, 1);
if (!Strings.isNullOrEmpty(autoAwareDataSourceName) && dataSourceNameAware.isPresent() && dataSourceNameAware.get().getRule().isPresent()) {
- Map<String, String> result = new HashMap<>(2, 1);
result.put(ExportableConstants.PRIMARY_DATA_SOURCE_NAME, dataSourceNameAware.get().getPrimaryDataSourceName(autoAwareDataSourceName));
result.put(ExportableConstants.REPLICA_DATA_SOURCE_NAMES, String.join(",", dataSourceNameAware.get().getReplicaDataSourceNames(autoAwareDataSourceName)));
return result;
}
- return Collections.emptyMap();
+ return result;
+ }
+
+ /**
+ * Get data sources.
+ *
+ * @return data sources
+ */
+ public Map<String, String> getDataSources() {
+ Map<String, String> result = new HashMap<>(2, 1);
+ if (Strings.isNullOrEmpty(autoAwareDataSourceName)) {
+ result.put(ExportableConstants.PRIMARY_DATA_SOURCE_NAME, writeDataSourceName);
+ result.put(ExportableConstants.REPLICA_DATA_SOURCE_NAMES, String.join(",",
+ readDataSourceNames.stream().filter(each -> !disabledDataSourceNames.contains(each)).collect(Collectors.toCollection(LinkedHashSet::new))));
+ }
+ return result;
}
+
}
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index d21a0c2..361bf9f 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -39,6 +39,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -121,17 +122,34 @@ public final class ReadwriteSplittingRule implements SchemaRule, DataSourceConta
Map<String, Object> result = new HashMap<>(1, 1);
result.put(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY, exportAutoAwareDataSourceMap());
result.put(ExportableConstants.AUTO_AWARE_DATA_SOURCE_NAME, exportAutoAwareDataSourceNames());
+ result.put(ExportableConstants.DATA_SOURCE_KEY, exportDataSourceNames());
return result;
}
-
+
private Map<String, Map<String, String>> exportAutoAwareDataSourceMap() {
Map<String, Map<String, String>> result = new HashMap<>(dataSourceRules.size(), 1);
- dataSourceRules.forEach((name, dataSourceRule) -> result.put(dataSourceRule.getName(), dataSourceRule.getAutoAwareDataSources()));
+ dataSourceRules.forEach((name, dataSourceRule) -> {
+ Map<String, String> autoAwareDataSources = dataSourceRule.getAutoAwareDataSources();
+ if (!autoAwareDataSources.isEmpty()) {
+ result.put(dataSourceRule.getName(), autoAwareDataSources);
+ }
+ });
+ return result;
+ }
+
+ private Map<String, Map<String, String>> exportDataSourceNames() {
+ Map<String, Map<String, String>> result = new HashMap<>(dataSourceRules.size(), 1);
+ dataSourceRules.forEach((name, dataSourceRule) -> {
+ Map<String, String> dataSources = dataSourceRule.getDataSources();
+ if (!dataSources.isEmpty()) {
+ result.put(dataSourceRule.getName(), dataSources);
+ }
+ });
return result;
}
private Collection<String> exportAutoAwareDataSourceNames() {
- return dataSourceRules.values().stream().map(ReadwriteSplittingDataSourceRule::getAutoAwareDataSourceName).collect(Collectors.toSet());
+ return dataSourceRules.values().stream().map(ReadwriteSplittingDataSourceRule::getAutoAwareDataSourceName).filter(Objects::nonNull).collect(Collectors.toSet());
}
@Override
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/query/ReadwriteSplittingRuleQueryResultSet.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwrite [...]
index 4f16ce7..ad18209 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/query/ReadwriteSplittingRuleQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/query/ReadwriteSplittingRuleQueryResultSet.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.readwritesplitting.distsql.handler.query;
import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
import org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet;
@@ -45,8 +46,10 @@ public final class ReadwriteSplittingRuleQueryResultSet implements DistSQLResult
private Iterator<ReadwriteSplittingDataSourceRuleConfiguration> data;
private Map<String, ShardingSphereAlgorithmConfiguration> loadBalancers;
-
- private Map<String, Map<String, String>> autoAwareDataSourceMap;
+
+ private Map<String, Map<String, String>> autoAwareDataSourceMap = Collections.emptyMap();
+
+ private Map<String, Map<String, String>> dataSourceMap = Collections.emptyMap();
@Override
public void init(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement) {
@@ -55,10 +58,13 @@ public final class ReadwriteSplittingRuleQueryResultSet implements DistSQLResult
data = ruleConfig.map(optional -> optional.getDataSources().iterator()).orElse(Collections.emptyIterator());
loadBalancers = ruleConfig.map(ReadwriteSplittingRuleConfiguration::getLoadBalancers).orElse(Collections.emptyMap());
Optional<ExportableRule> exportableRule = metaData.getRuleMetaData().getRules().stream()
- .filter(each -> each instanceof ExportableRule && ((ExportableRule) each).export().containsKey(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY))
- .map(each -> (ExportableRule) each).findAny();
- autoAwareDataSourceMap = (Map<String, Map<String, String>>) exportableRule.map(optional -> optional.export().get(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY))
- .orElse(Collections.emptyMap());
+ .filter(each -> each instanceof ExportableRule).map(each -> (ExportableRule) each)
+ .filter(each -> each.export().containsKey(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY)).findAny();
+ exportableRule.ifPresent(op -> {
+ Map<String, Object> exportedReadwriteRules = op.export();
+ autoAwareDataSourceMap = (Map<String, Map<String, String>>) exportedReadwriteRules.getOrDefault(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY, Collections.emptyMap());
+ dataSourceMap = (Map<String, Map<String, String>>) exportedReadwriteRules.getOrDefault(ExportableConstants.DATA_SOURCE_KEY, Collections.emptyMap());
+ });
}
@Override
@@ -77,12 +83,14 @@ public final class ReadwriteSplittingRuleQueryResultSet implements DistSQLResult
Optional<ShardingSphereAlgorithmConfiguration> configuration = Optional.ofNullable(loadBalancers.get(ruleConfig.getLoadBalancerName()));
String writeDataSourceName = ruleConfig.getWriteDataSourceName();
String readDataSourceNames = Joiner.on(",").join(ruleConfig.getReadDataSourceNames());
- if (null != ruleConfig.getAutoAwareDataSourceName()) {
- writeDataSourceName = autoAwareDataSourceMap.get(ruleConfig.getName()).get(ExportableConstants.PRIMARY_DATA_SOURCE_NAME);
- readDataSourceNames = autoAwareDataSourceMap.get(ruleConfig.getName()).get(ExportableConstants.REPLICA_DATA_SOURCE_NAMES);
+ Map<String, String> exportDataSources = !Strings.isNullOrEmpty(ruleConfig.getAutoAwareDataSourceName())
+ ? autoAwareDataSourceMap.get(ruleConfig.getName()) : dataSourceMap.get(ruleConfig.getName());
+ if (null != exportDataSources && !exportDataSources.isEmpty()) {
+ writeDataSourceName = exportDataSources.get(ExportableConstants.PRIMARY_DATA_SOURCE_NAME);
+ readDataSourceNames = exportDataSources.get(ExportableConstants.REPLICA_DATA_SOURCE_NAMES);
}
- return Arrays.asList(ruleConfig.getName(), ruleConfig.getAutoAwareDataSourceName(), writeDataSourceName, readDataSourceNames,
- configuration.map(ShardingSphereAlgorithmConfiguration::getType).orElse(null),
+ return Arrays.asList(ruleConfig.getName(), ruleConfig.getAutoAwareDataSourceName(), writeDataSourceName, readDataSourceNames,
+ configuration.map(ShardingSphereAlgorithmConfiguration::getType).orElse(null),
PropertiesConverter.convert(configuration.map(ShardingSphereAlgorithmConfiguration::getProps).orElseGet(Properties::new)));
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java
index 32bd83c..ea87cb9 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java
@@ -35,4 +35,6 @@ public final class ExportableConstants {
public static final String PRIMARY_DATA_SOURCE_NAME = "primary_data_source_name";
public static final String REPLICA_DATA_SOURCE_NAMES = "replica_data_source_names";
+
+ public static final String DATA_SOURCE_KEY = "data_source_key";
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java
index 7190211..5d641bc 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java
@@ -18,11 +18,16 @@
package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.excutor;
import lombok.AllArgsConstructor;
+import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
import org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.exception.SchemaNotExistedException;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import org.apache.shardingsphere.infra.rule.identifier.type.ExportableRule;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.StorageNodeStatus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
@@ -30,9 +35,19 @@ import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResp
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.SetStatementExecutor;
import org.apache.shardingsphere.readwritesplitting.distsql.parser.statement.status.SetReadwriteSplittingStatusStatement;
+import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Set readwrite-splitting status executor.
@@ -40,6 +55,8 @@ import java.util.Collections;
@AllArgsConstructor
public final class SetReadwriteSplittingStatusExecutor implements SetStatementExecutor {
+ private static final String DISABLE = "DISABLE";
+
private final SetReadwriteSplittingStatusStatement sqlStatement;
private final ConnectionSession connectionSession;
@@ -47,16 +64,118 @@ public final class SetReadwriteSplittingStatusExecutor implements SetStatementEx
@Override
public ResponseHeader execute() throws DistSQLException {
String schemaName = sqlStatement.getSchema().isPresent() ? sqlStatement.getSchema().get().getIdentifier().getValue() : connectionSession.getSchemaName();
+ String resourceName = sqlStatement.getResourceName();
+ checkSchema(schemaName);
+ boolean isDisable = DISABLE.equals(sqlStatement.getStatus());
+ if (isDisable) {
+ checkDisablingIsValid(schemaName, resourceName);
+ } else {
+ checkEnablingIsValid(schemaName, resourceName);
+ }
+ ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, resourceName, isDisable));
+ return new UpdateResponseHeader(sqlStatement);
+ }
+
+ private void checkSchema(final String schemaName) {
if (null == schemaName) {
throw new NoDatabaseSelectedException();
}
if (!ProxyContext.getInstance().getAllSchemaNames().contains(schemaName)) {
throw new SchemaNotExistedException(schemaName);
}
- String resourceName = sqlStatement.getResourceName();
- Collection<String> notExistedResources = ProxyContext.getInstance().getMetaData(schemaName).getResource().getNotExistedResources(Collections.singleton(resourceName));
- DistSQLException.predictionThrow(notExistedResources.isEmpty(), new RequiredResourceMissedException(schemaName, Collections.singleton(resourceName)));
- ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, resourceName, "DISABLE".equals(sqlStatement.getStatus())));
- return new UpdateResponseHeader(sqlStatement);
+ }
+
+ private void checkEnablingIsValid(final String schemaName, final String toBeEnabledResource) throws DistSQLException {
+ checkResourceExists(schemaName, toBeEnabledResource);
+ Collection<String> disabledResources = getDisabledResources(schemaName);
+ if (!disabledResources.contains(toBeEnabledResource)) {
+ throw new UnsupportedOperationException(String.format("`%s` is not disabled", toBeEnabledResource));
+ }
+ }
+
+ private void checkDisablingIsValid(final String schemaName, final String toBeDisabledResource) throws DistSQLException {
+ checkResourceExists(schemaName, toBeDisabledResource);
+ Collection<String> disabledResources = getDisabledResources(schemaName);
+ if (disabledResources.contains(toBeDisabledResource)) {
+ throw new UnsupportedOperationException(String.format("`%s` has been disabled", toBeDisabledResource));
+ }
+ Map<String, Map<String, String>> readwriteSplittingRules = getExportedReadwriteSplittingRules(schemaName);
+ Map<String, String> primaryResources = new HashMap<>();
+ Map<String, String> replicaResources = new HashMap<>();
+ readwriteSplittingRules.entrySet().stream().filter(entry -> !entry.getValue().isEmpty())
+ .peek(entry -> addPrimaryResource(primaryResources, entry)).forEach(entry -> addReplicaResource(replicaResources, entry));
+ if (primaryResources.containsKey(toBeDisabledResource)) {
+ throw new UnsupportedOperationException(String.format("`%s` is the primary resource in the `%s` rule, cannot be disabled",
+ toBeDisabledResource, primaryResources.get(toBeDisabledResource)));
+ }
+ if (!replicaResources.containsKey(toBeDisabledResource)) {
+ throw new UnsupportedOperationException(String.format("`%s` is not used by any readwrite-splitting rule, cannot be disabled", toBeDisabledResource));
+ }
+ Set<String> canBeDisabledResources = getCanBeDisabledResources(replicaResources, disabledResources);
+ if (!canBeDisabledResources.contains(toBeDisabledResource)) {
+ throw new UnsupportedOperationException(String.format("`%s` is the last read resource in `%s`, cannot be disabled", toBeDisabledResource, replicaResources.get(toBeDisabledResource)));
+ }
+ }
+
+ private Collection<String> getDisabledResources(final String schemaName) {
+ Optional<MetaDataPersistService> persistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
+ List<String> result = new ArrayList<>();
+ persistService.ifPresent(op -> result.addAll(getStorageNodeStatusData(op).stream().filter(each -> isCurrentSchema(schemaName, each)).map(this::getResourceName).collect(Collectors.toSet())));
+ return result;
+ }
+
+ private Collection<String> getStorageNodeStatusData(final MetaDataPersistService persistService) {
+ return persistService.getRepository().getChildrenKeys(StorageStatusNode.getStatusPath(StorageNodeStatus.DISABLE));
+ }
+
+ private Map<String, Map<String, String>> getExportedReadwriteSplittingRules(final String schemaName) {
+ Map<String, Map<String, String>> readwriteSplittingRules = new HashMap<>();
+ ProxyContext.getInstance().getMetaData(schemaName).getRuleMetaData().findRules(ReadwriteSplittingRule.class).stream().findAny()
+ .map(each -> ((ExportableRule) each).export())
+ .filter(each -> each.containsKey(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY) || each.containsKey(ExportableConstants.DATA_SOURCE_KEY))
+ .ifPresent(each -> {
+ readwriteSplittingRules.putAll((Map) each.getOrDefault(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY, Collections.emptyMap()));
+ readwriteSplittingRules.putAll((Map) each.getOrDefault(ExportableConstants.DATA_SOURCE_KEY, Collections.emptyMap()));
+ });
+ return readwriteSplittingRules;
+ }
+
+ private Set<String> getCanBeDisabledResources(final Map<String, String> replicaResources, final Collection<String> haveBeenDisabledResources) {
+ Set<String> onlyOneResourceRules = replicaResources.values().stream().map(schemaName -> Arrays.stream(schemaName.split(",")).collect(Collectors.toMap(each -> each, each -> 1)).entrySet())
+ .flatMap(Collection::stream).collect(Collectors.toMap(Entry::getKey, Entry::getValue, Integer::sum)).entrySet().stream()
+ .filter(entry -> entry.getValue() <= 1).map(Entry::getKey).collect(Collectors.toSet());
+ return replicaResources.entrySet().stream().filter(entry -> !haveBeenDisabledResources.contains(entry.getKey()))
+ .filter(entry -> onlyOneResourceRules.stream().noneMatch(each -> Arrays.asList(entry.getValue().split(",")).contains(each))).map(Entry::getKey).collect(Collectors.toSet());
+ }
+
+ private void checkResourceExists(final String schemaName, final String toBeDisabledResource) throws DistSQLException {
+ Collection<String> notExistedResources = ProxyContext.getInstance().getMetaData(schemaName).getResource().getNotExistedResources(Collections.singleton(toBeDisabledResource));
+ DistSQLException.predictionThrow(notExistedResources.isEmpty(), new RequiredResourceMissedException(schemaName, Collections.singleton(toBeDisabledResource)));
+ }
+
+ private void addPrimaryResource(final Map<String, String> primaryResources, final Entry<String, Map<String, String>> entry) {
+ entry.getValue().entrySet().stream().filter(entry1 -> ExportableConstants.PRIMARY_DATA_SOURCE_NAME.equals(entry1.getKey()))
+ .forEach(entry1 -> put(primaryResources, entry1.getValue(), entry.getKey()));
+ }
+
+ private void addReplicaResource(final Map<String, String> replicaResources, final Entry<String, Map<String, String>> entry) {
+ entry.getValue().entrySet().stream().filter(entry1 -> ExportableConstants.REPLICA_DATA_SOURCE_NAMES.equals(entry1.getKey()))
+ .map(entry1 -> Arrays.asList(entry1.getValue().split(","))).flatMap(Collection::stream).forEach(each -> put(replicaResources, each, entry.getKey()));
+ }
+
+ private boolean isCurrentSchema(final String schemaName, final String nodeData) {
+ return schemaName.equals(nodeData.split("\\.")[0]);
+ }
+
+ private String getResourceName(final String nodeData) {
+ return nodeData.split("\\.")[1];
+ }
+
+ private void put(final Map<String, String> map, final String key, final String value) {
+ if (map.containsKey(key)) {
+ map.put(key, String.join(",", map.get(key), value));
+ } else {
+ map.put(key, value);
+ }
}
}