You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ji...@apache.org on 2022/01/12 05:31:03 UTC

[shardingsphere] branch master updated: [DistSQL] Add check for resource in set readwrite splitting read resource syntax (#14685)

This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0967b84  [DistSQL] Add check for resource in set readwrite splitting read resource syntax (#14685)
0967b84 is described below

commit 0967b84394c876a196c75eb3e2f05c6c6f6b4e11
Author: lanchengx <52...@users.noreply.github.com>
AuthorDate: Wed Jan 12 13:30:05 2022 +0800

    [DistSQL] Add check for resource in set readwrite splitting read resource syntax (#14685)
    
    * Add check for resource in `set readwrite splitting read resource` syntax
    
    * Fix the `show readwrite splitting rules` syntax to display static configuration, the disabled database is not removed
    
    * Adjust collection type.
    
    * Modify exception information.
    
    * Modify exception information.
    
    * Modify exception information.
---
 .../rule/ReadwriteSplittingDataSourceRule.java     |  24 +++-
 .../rule/ReadwriteSplittingRule.java               |  24 +++-
 .../ReadwriteSplittingRuleQueryResultSet.java      |  30 +++--
 .../distsql/constant/ExportableConstants.java      |   2 +
 .../SetReadwriteSplittingStatusExecutor.java       | 129 ++++++++++++++++++++-
 5 files changed, 186 insertions(+), 23 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java
index 0588e16..ecba230 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingDataSourceRule.java
@@ -28,9 +28,9 @@ import org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingD
 import org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -117,7 +117,7 @@ public final class ReadwriteSplittingDataSourceRule {
         result.put(name, actualDataSourceNames);
         return result;
     }
-
+    
     /**
      * Get auto aware data sources.
      *
@@ -125,12 +125,28 @@ public final class ReadwriteSplittingDataSourceRule {
      */
     public Map<String, String> getAutoAwareDataSources() {
         Optional<DataSourceNameAware> dataSourceNameAware = DataSourceNameAwareFactory.getInstance().getDataSourceNameAware();
+        Map<String, String> result = new HashMap<>(2, 1);
         if (!Strings.isNullOrEmpty(autoAwareDataSourceName) && dataSourceNameAware.isPresent() && dataSourceNameAware.get().getRule().isPresent()) {
-            Map<String, String> result = new HashMap<>(2, 1);
             result.put(ExportableConstants.PRIMARY_DATA_SOURCE_NAME, dataSourceNameAware.get().getPrimaryDataSourceName(autoAwareDataSourceName));
             result.put(ExportableConstants.REPLICA_DATA_SOURCE_NAMES, String.join(",", dataSourceNameAware.get().getReplicaDataSourceNames(autoAwareDataSourceName)));
             return result;
         }
-        return Collections.emptyMap();
+        return result;
+    }
+    
+    /**
+     * Get data sources.
+     *
+     * @return data sources
+     */
+    public Map<String, String> getDataSources() {
+        Map<String, String> result = new HashMap<>(2, 1);
+        if (Strings.isNullOrEmpty(autoAwareDataSourceName)) {
+            result.put(ExportableConstants.PRIMARY_DATA_SOURCE_NAME, writeDataSourceName);
+            result.put(ExportableConstants.REPLICA_DATA_SOURCE_NAMES, String.join(",",
+                    readDataSourceNames.stream().filter(each -> !disabledDataSourceNames.contains(each)).collect(Collectors.toCollection(LinkedHashSet::new))));
+        }
+        return result;
     }
+    
 }
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index d21a0c2..361bf9f 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -39,6 +39,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -121,17 +122,34 @@ public final class ReadwriteSplittingRule implements SchemaRule, DataSourceConta
         Map<String, Object> result = new HashMap<>(1, 1);
         result.put(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY, exportAutoAwareDataSourceMap());
         result.put(ExportableConstants.AUTO_AWARE_DATA_SOURCE_NAME, exportAutoAwareDataSourceNames());
+        result.put(ExportableConstants.DATA_SOURCE_KEY, exportDataSourceNames());
         return result;
     }
-
+    
     private Map<String, Map<String, String>> exportAutoAwareDataSourceMap() {
         Map<String, Map<String, String>> result = new HashMap<>(dataSourceRules.size(), 1);
-        dataSourceRules.forEach((name, dataSourceRule) -> result.put(dataSourceRule.getName(), dataSourceRule.getAutoAwareDataSources()));
+        dataSourceRules.forEach((name, dataSourceRule) -> {
+            Map<String, String> autoAwareDataSources = dataSourceRule.getAutoAwareDataSources();
+            if (!autoAwareDataSources.isEmpty()) {
+                result.put(dataSourceRule.getName(), autoAwareDataSources);
+            }
+        });
+        return result;
+    }
+    
+    private Map<String, Map<String, String>> exportDataSourceNames() {
+        Map<String, Map<String, String>> result = new HashMap<>(dataSourceRules.size(), 1);
+        dataSourceRules.forEach((name, dataSourceRule) -> {
+            Map<String, String> dataSources = dataSourceRule.getDataSources();
+            if (!dataSources.isEmpty()) {
+                result.put(dataSourceRule.getName(), dataSources);
+            }
+        });
         return result;
     }
     
     private Collection<String> exportAutoAwareDataSourceNames() {
-        return dataSourceRules.values().stream().map(ReadwriteSplittingDataSourceRule::getAutoAwareDataSourceName).collect(Collectors.toSet());
+        return dataSourceRules.values().stream().map(ReadwriteSplittingDataSourceRule::getAutoAwareDataSourceName).filter(Objects::nonNull).collect(Collectors.toSet());
     }
     
     @Override
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/query/ReadwriteSplittingRuleQueryResultSet.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwrite [...]
index 4f16ce7..ad18209 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/query/ReadwriteSplittingRuleQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-distsql/shardingsphere-readwrite-splitting-distsql-handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/query/ReadwriteSplittingRuleQueryResultSet.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.readwritesplitting.distsql.handler.query;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
 import org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet;
@@ -45,8 +46,10 @@ public final class ReadwriteSplittingRuleQueryResultSet implements DistSQLResult
     private Iterator<ReadwriteSplittingDataSourceRuleConfiguration> data;
     
     private Map<String, ShardingSphereAlgorithmConfiguration> loadBalancers;
-
-    private Map<String, Map<String, String>> autoAwareDataSourceMap;
+    
+    private Map<String, Map<String, String>> autoAwareDataSourceMap = Collections.emptyMap();
+    
+    private Map<String, Map<String, String>> dataSourceMap = Collections.emptyMap();
     
     @Override
     public void init(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement) {
@@ -55,10 +58,13 @@ public final class ReadwriteSplittingRuleQueryResultSet implements DistSQLResult
         data = ruleConfig.map(optional -> optional.getDataSources().iterator()).orElse(Collections.emptyIterator());
         loadBalancers = ruleConfig.map(ReadwriteSplittingRuleConfiguration::getLoadBalancers).orElse(Collections.emptyMap());
         Optional<ExportableRule> exportableRule = metaData.getRuleMetaData().getRules().stream()
-                .filter(each -> each instanceof ExportableRule && ((ExportableRule) each).export().containsKey(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY))
-                .map(each -> (ExportableRule) each).findAny();
-        autoAwareDataSourceMap = (Map<String, Map<String, String>>) exportableRule.map(optional -> optional.export().get(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY))
-                .orElse(Collections.emptyMap());
+                .filter(each -> each instanceof ExportableRule).map(each -> (ExportableRule) each)
+                .filter(each -> each.export().containsKey(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY)).findAny();
+        exportableRule.ifPresent(op -> {
+            Map<String, Object> exportedReadwriteRules = op.export();
+            autoAwareDataSourceMap = (Map<String, Map<String, String>>) exportedReadwriteRules.getOrDefault(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY, Collections.emptyMap());
+            dataSourceMap = (Map<String, Map<String, String>>) exportedReadwriteRules.getOrDefault(ExportableConstants.DATA_SOURCE_KEY, Collections.emptyMap());
+        });
     }
     
     @Override
@@ -77,12 +83,14 @@ public final class ReadwriteSplittingRuleQueryResultSet implements DistSQLResult
         Optional<ShardingSphereAlgorithmConfiguration> configuration = Optional.ofNullable(loadBalancers.get(ruleConfig.getLoadBalancerName()));
         String writeDataSourceName = ruleConfig.getWriteDataSourceName();
         String readDataSourceNames = Joiner.on(",").join(ruleConfig.getReadDataSourceNames());
-        if (null != ruleConfig.getAutoAwareDataSourceName()) {
-            writeDataSourceName = autoAwareDataSourceMap.get(ruleConfig.getName()).get(ExportableConstants.PRIMARY_DATA_SOURCE_NAME);
-            readDataSourceNames = autoAwareDataSourceMap.get(ruleConfig.getName()).get(ExportableConstants.REPLICA_DATA_SOURCE_NAMES);
+        Map<String, String> exportDataSources = !Strings.isNullOrEmpty(ruleConfig.getAutoAwareDataSourceName())
+                ? autoAwareDataSourceMap.get(ruleConfig.getName()) : dataSourceMap.get(ruleConfig.getName());
+        if (null != exportDataSources && !exportDataSources.isEmpty()) {
+            writeDataSourceName = exportDataSources.get(ExportableConstants.PRIMARY_DATA_SOURCE_NAME);
+            readDataSourceNames = exportDataSources.get(ExportableConstants.REPLICA_DATA_SOURCE_NAMES);
         }
-        return Arrays.asList(ruleConfig.getName(), ruleConfig.getAutoAwareDataSourceName(), writeDataSourceName, readDataSourceNames, 
-                configuration.map(ShardingSphereAlgorithmConfiguration::getType).orElse(null), 
+        return Arrays.asList(ruleConfig.getName(), ruleConfig.getAutoAwareDataSourceName(), writeDataSourceName, readDataSourceNames,
+                configuration.map(ShardingSphereAlgorithmConfiguration::getType).orElse(null),
                 PropertiesConverter.convert(configuration.map(ShardingSphereAlgorithmConfiguration::getProps).orElseGet(Properties::new)));
     }
     
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java
index 32bd83c..ea87cb9 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/distsql/constant/ExportableConstants.java
@@ -35,4 +35,6 @@ public final class ExportableConstants {
     public static final String PRIMARY_DATA_SOURCE_NAME = "primary_data_source_name";
     
     public static final String REPLICA_DATA_SOURCE_NAMES = "replica_data_source_names";
+    
+    public static final String DATA_SOURCE_KEY = "data_source_key";
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java
index 7190211..5d641bc 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetReadwriteSplittingStatusExecutor.java
@@ -18,11 +18,16 @@
 package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.excutor;
 
 import lombok.AllArgsConstructor;
+import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
 import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
 import org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.exception.SchemaNotExistedException;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import org.apache.shardingsphere.infra.rule.identifier.type.ExportableRule;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.StorageNodeStatus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
@@ -30,9 +35,19 @@ import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResp
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.SetStatementExecutor;
 import org.apache.shardingsphere.readwritesplitting.distsql.parser.statement.status.SetReadwriteSplittingStatusStatement;
+import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Set readwrite-splitting status executor.
@@ -40,6 +55,8 @@ import java.util.Collections;
 @AllArgsConstructor
 public final class SetReadwriteSplittingStatusExecutor implements SetStatementExecutor {
     
+    private static final String DISABLE = "DISABLE";
+    
     private final SetReadwriteSplittingStatusStatement sqlStatement;
     
     private final ConnectionSession connectionSession;
@@ -47,16 +64,118 @@ public final class SetReadwriteSplittingStatusExecutor implements SetStatementEx
     @Override
     public ResponseHeader execute() throws DistSQLException {
         String schemaName = sqlStatement.getSchema().isPresent() ? sqlStatement.getSchema().get().getIdentifier().getValue() : connectionSession.getSchemaName();
+        String resourceName = sqlStatement.getResourceName();
+        checkSchema(schemaName);
+        boolean isDisable = DISABLE.equals(sqlStatement.getStatus());
+        if (isDisable) {
+            checkDisablingIsValid(schemaName, resourceName);
+        } else {
+            checkEnablingIsValid(schemaName, resourceName);
+        }
+        ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, resourceName, isDisable));
+        return new UpdateResponseHeader(sqlStatement);
+    }
+    
+    private void checkSchema(final String schemaName) {
         if (null == schemaName) {
             throw new NoDatabaseSelectedException();
         }
         if (!ProxyContext.getInstance().getAllSchemaNames().contains(schemaName)) {
             throw new SchemaNotExistedException(schemaName);
         }
-        String resourceName = sqlStatement.getResourceName();
-        Collection<String> notExistedResources = ProxyContext.getInstance().getMetaData(schemaName).getResource().getNotExistedResources(Collections.singleton(resourceName));
-        DistSQLException.predictionThrow(notExistedResources.isEmpty(), new RequiredResourceMissedException(schemaName, Collections.singleton(resourceName)));
-        ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, resourceName, "DISABLE".equals(sqlStatement.getStatus())));
-        return new UpdateResponseHeader(sqlStatement);
+    }
+    
+    private void checkEnablingIsValid(final String schemaName, final String toBeEnabledResource) throws DistSQLException {
+        checkResourceExists(schemaName, toBeEnabledResource);
+        Collection<String> disabledResources = getDisabledResources(schemaName);
+        if (!disabledResources.contains(toBeEnabledResource)) {
+            throw new UnsupportedOperationException(String.format("`%s` is not disabled", toBeEnabledResource));
+        }
+    }
+    
+    private void checkDisablingIsValid(final String schemaName, final String toBeDisabledResource) throws DistSQLException {
+        checkResourceExists(schemaName, toBeDisabledResource);
+        Collection<String> disabledResources = getDisabledResources(schemaName);
+        if (disabledResources.contains(toBeDisabledResource)) {
+            throw new UnsupportedOperationException(String.format("`%s` has been disabled", toBeDisabledResource));
+        }
+        Map<String, Map<String, String>> readwriteSplittingRules = getExportedReadwriteSplittingRules(schemaName);
+        Map<String, String> primaryResources = new HashMap<>();
+        Map<String, String> replicaResources = new HashMap<>();
+        readwriteSplittingRules.entrySet().stream().filter(entry -> !entry.getValue().isEmpty())
+                .peek(entry -> addPrimaryResource(primaryResources, entry)).forEach(entry -> addReplicaResource(replicaResources, entry));
+        if (primaryResources.containsKey(toBeDisabledResource)) {
+            throw new UnsupportedOperationException(String.format("`%s` is the primary resource in the `%s` rule, cannot be disabled", 
+                    toBeDisabledResource, primaryResources.get(toBeDisabledResource)));
+        }
+        if (!replicaResources.containsKey(toBeDisabledResource)) {
+            throw new UnsupportedOperationException(String.format("`%s` is not used by any readwrite-splitting rule, cannot be disabled", toBeDisabledResource));
+        }
+        Set<String> canBeDisabledResources = getCanBeDisabledResources(replicaResources, disabledResources);
+        if (!canBeDisabledResources.contains(toBeDisabledResource)) {
+            throw new UnsupportedOperationException(String.format("`%s` is the last read resource in `%s`, cannot be disabled", toBeDisabledResource, replicaResources.get(toBeDisabledResource)));
+        }
+    }
+    
+    private Collection<String> getDisabledResources(final String schemaName) {
+        Optional<MetaDataPersistService> persistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
+        List<String> result = new ArrayList<>();
+        persistService.ifPresent(op -> result.addAll(getStorageNodeStatusData(op).stream().filter(each -> isCurrentSchema(schemaName, each)).map(this::getResourceName).collect(Collectors.toSet())));
+        return result;
+    }
+    
+    private Collection<String> getStorageNodeStatusData(final MetaDataPersistService persistService) {
+        return persistService.getRepository().getChildrenKeys(StorageStatusNode.getStatusPath(StorageNodeStatus.DISABLE));
+    }
+    
+    private Map<String, Map<String, String>> getExportedReadwriteSplittingRules(final String schemaName) {
+        Map<String, Map<String, String>> readwriteSplittingRules = new HashMap<>();
+        ProxyContext.getInstance().getMetaData(schemaName).getRuleMetaData().findRules(ReadwriteSplittingRule.class).stream().findAny()
+                .map(each -> ((ExportableRule) each).export())
+                .filter(each -> each.containsKey(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY) || each.containsKey(ExportableConstants.DATA_SOURCE_KEY))
+                .ifPresent(each -> {
+                    readwriteSplittingRules.putAll((Map) each.getOrDefault(ExportableConstants.AUTO_AWARE_DATA_SOURCE_KEY, Collections.emptyMap()));
+                    readwriteSplittingRules.putAll((Map) each.getOrDefault(ExportableConstants.DATA_SOURCE_KEY, Collections.emptyMap()));
+                });
+        return readwriteSplittingRules;
+    }
+    
+    private Set<String> getCanBeDisabledResources(final Map<String, String> replicaResources, final Collection<String> haveBeenDisabledResources) {
+        Set<String> onlyOneResourceRules = replicaResources.values().stream().map(schemaName -> Arrays.stream(schemaName.split(",")).collect(Collectors.toMap(each -> each, each -> 1)).entrySet())
+                .flatMap(Collection::stream).collect(Collectors.toMap(Entry::getKey, Entry::getValue, Integer::sum)).entrySet().stream()
+                .filter(entry -> entry.getValue() <= 1).map(Entry::getKey).collect(Collectors.toSet());
+        return replicaResources.entrySet().stream().filter(entry -> !haveBeenDisabledResources.contains(entry.getKey()))
+                .filter(entry -> onlyOneResourceRules.stream().noneMatch(each -> Arrays.asList(entry.getValue().split(",")).contains(each))).map(Entry::getKey).collect(Collectors.toSet());
+    }
+    
+    private void checkResourceExists(final String schemaName, final String toBeDisabledResource) throws DistSQLException {
+        Collection<String> notExistedResources = ProxyContext.getInstance().getMetaData(schemaName).getResource().getNotExistedResources(Collections.singleton(toBeDisabledResource));
+        DistSQLException.predictionThrow(notExistedResources.isEmpty(), new RequiredResourceMissedException(schemaName, Collections.singleton(toBeDisabledResource)));
+    }
+    
+    private void addPrimaryResource(final Map<String, String> primaryResources, final Entry<String, Map<String, String>> entry) {
+        entry.getValue().entrySet().stream().filter(entry1 -> ExportableConstants.PRIMARY_DATA_SOURCE_NAME.equals(entry1.getKey()))
+                .forEach(entry1 -> put(primaryResources, entry1.getValue(), entry.getKey()));
+    }
+    
+    private void addReplicaResource(final Map<String, String> replicaResources, final Entry<String, Map<String, String>> entry) {
+        entry.getValue().entrySet().stream().filter(entry1 -> ExportableConstants.REPLICA_DATA_SOURCE_NAMES.equals(entry1.getKey()))
+                .map(entry1 -> Arrays.asList(entry1.getValue().split(","))).flatMap(Collection::stream).forEach(each -> put(replicaResources, each, entry.getKey()));
+    }
+    
+    private boolean isCurrentSchema(final String schemaName, final String nodeData) {
+        return schemaName.equals(nodeData.split("\\.")[0]);
+    }
+    
+    private String getResourceName(final String nodeData) {
+        return nodeData.split("\\.")[1];
+    }
+    
+    private void put(final Map<String, String> map, final String key, final String value) {
+        if (map.containsKey(key)) {
+            map.put(key, String.join(",", map.get(key), value));
+        } else {
+            map.put(key, value);
+        }
     }
 }