You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/06 14:17:56 UTC

[shardingsphere] branch master updated: #7318, add data source disable and enable state (#8921)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fde3fb  #7318, add data source disable and enable state (#8921)
8fde3fb is described below

commit 8fde3fb95783576198cc62f73a160cb281746178
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Wed Jan 6 22:17:17 2021 +0800

    #7318, add data source disable and enable state (#8921)
    
    * #7318, add data source disable and enable state
    
    * #7318, add data source disable and enable state
---
 .../org/apache/shardingsphere/ha/spi/HAType.java   |  20 +++-
 .../shardingsphere/ha/rule/HADataSourceRule.java   |   5 +-
 .../org/apache/shardingsphere/ha/rule/HARule.java  |  14 ++-
 .../ha/fixture/TestHATypeFixture.java              |   9 +-
 .../apache/shardingsphere/ha/mgr/MGRHAType.java    | 111 +++++++++++++++++++--
 .../shardingsphere/ha/mgr/MGRPeriodicalJob.java    |  16 ++-
 .../ha/route/fixture/TestRouteHATypeFixture.java   |  10 +-
 .../governance/core/registry/RegistryCenter.java   |  13 +++
 .../rule/event/impl/DataSourceDisabledEvent.java   |  31 ++----
 9 files changed, 182 insertions(+), 47 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index a5fdca0..c5f6c61 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.spi.typed.TypedSPI;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -40,18 +41,29 @@ public interface HAType extends TypedSPI {
     /**
      * Update primary data source.
      *
-     * @param dataSourceMap data source map
+     * @param originalDataSourceMap original data source map
      * @param schemaName schema name
+     * @param disabledDataSourceNames disabled data source names
      */
-    void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String schemaName);
+    void updatePrimaryDataSource(Map<String, DataSource> originalDataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
+    
+    /**
+     * Update member state.
+     *
+     * @param originalDataSourceMap original data source map
+     * @param schemaName schema name
+     * @param disabledDataSourceNames disabled data source names
+     */
+    void updateMemberState(Map<String, DataSource> originalDataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
     
     /**
      * Start periodical update.
      *
-     * @param dataSourceMap data source map
+     * @param originalDataSourceMap original data source map
      * @param schemaName schema name
+     * @param disabledDataSourceNames disabled data source names
      */
-    void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName);
+    void startPeriodicalUpdate(Map<String, DataSource> originalDataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
     
     /**
      * Stop periodical update.
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
index 5075a56..2796324 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.ha.rule;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.shardingsphere.ha.api.config.rule.HADataSourceRuleConfiguration;
 import org.apache.shardingsphere.ha.spi.ReplicaLoadBalanceAlgorithm;
@@ -46,7 +45,6 @@ public final class HADataSourceRule {
     
     private final boolean replicaQuery;
     
-    @Getter(AccessLevel.NONE)
     private final Collection<String> disabledDataSourceNames = new HashSet<>();
     
     public HADataSourceRule(final HADataSourceRuleConfiguration config, final ReplicaLoadBalanceAlgorithm loadBalancer) {
@@ -92,8 +90,7 @@ public final class HADataSourceRule {
      */
     public Map<String, Collection<String>> getDataSourceMapper() {
         Map<String, Collection<String>> result = new HashMap<>(1, 1);
-        Collection<String> actualDataSourceNames = new LinkedList<>();
-        actualDataSourceNames.addAll(dataSourceNames);
+        Collection<String> actualDataSourceNames = new LinkedList<>(dataSourceNames);
         result.put(name, actualDataSourceNames);
         return result;
     }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index c5c21d7..af46490 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -73,15 +73,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
                     ? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
             dataSourceRules.put(each.getName(), new HADataSourceRule(each, loadBalanceAlgorithm));
         }
+        Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
+        Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
         if (null == haType) {
             haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaConfiguration().getType(), config.getHaConfiguration().getProps());
-            haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+            haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+            haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
         } else {
             haType.stopPeriodicalUpdate();
         }
         try {
             haType.checkHAConfig(dataSourceMap, schemaName);
-            haType.startPeriodicalUpdate(dataSourceMap, schemaName);
+            haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         }
@@ -99,15 +102,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
                     ? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
             dataSourceRules.put(each.getName(), new HADataSourceRule(each, loadBalanceAlgorithm));
         }
+        Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
+        Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
         if (null == haType) {
             haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaType().getType(), config.getHaType().getProps());
-            haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+            haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+            haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
         } else {
             haType.stopPeriodicalUpdate();
         }
         try {
             haType.checkHAConfig(dataSourceMap, schemaName);
-            haType.startPeriodicalUpdate(dataSourceMap, schemaName);
+            haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index 1bd200a..1641fe8 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.fixture;
 import org.apache.shardingsphere.ha.spi.HAType;
 
 import javax.sql.DataSource;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -32,11 +33,15 @@ public final class TestHATypeFixture implements HAType {
     }
     
     @Override
-    public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+    public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
     }
     
     @Override
-    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+    public void updateMemberState(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    }
+    
+    @Override
+    public void startPeriodicalUpdate(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
     }
     
     @Override
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index 6065d2e..b1a2aeb 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -27,12 +27,18 @@ import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration
 import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
 import org.apache.shardingsphere.ha.spi.HAType;
 import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -51,6 +57,8 @@ public final class MGRHAType implements HAType {
     
     private static final String SINGLE_PRIMARY = "SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'";
     
+    private static final String MEMBER_LIST = "SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members";
+    
     private static CoordinatorRegistryCenter coordinatorRegistryCenter;
     
     private ScheduleJobBootstrap scheduleJobBootstrap;
@@ -115,11 +123,19 @@ public final class MGRHAType implements HAType {
     }
     
     @Override
-    public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName) {
-        String newPrimaryDataSource = determinePrimaryDataSource(dataSourceMap);
+    public void updatePrimaryDataSource(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+        Map<String, DataSource> activeDataSourceMap = new HashMap<>(originalDataSourceMap);
+        if (!disabledDataSourceNames.isEmpty()) {
+            activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
+        }
+        String newPrimaryDataSource = determinePrimaryDataSource(activeDataSourceMap);
         if (newPrimaryDataSource.isEmpty()) {
             return;
         }
+        // TODO post primary datasource event
+//        if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
+//             ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceUpdateEvent(schemaName, newPrimaryDataSource, newPrimaryDataSource));
+//        }
         oldPrimaryDataSource = newPrimaryDataSource;
     }
     
@@ -149,8 +165,7 @@ public final class MGRHAType implements HAType {
     private String findPrimaryDataSourceName(final String primaryDataSourceURL, final Map<String, DataSource> dataSourceMap) {
         String result = "";
         for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
-            DataSource dataSource = entry.getValue();
-            try (Connection connection = dataSource.getConnection()) {
+            try (Connection connection = entry.getValue().getConnection()) {
                 if (connection.getMetaData().getURL().contains(primaryDataSourceURL)) {
                     return entry.getKey();
                 }
@@ -162,13 +177,97 @@ public final class MGRHAType implements HAType {
     }
     
     @Override
-    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+    public void updateMemberState(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+        Map<String, DataSource> activeDataSourceMap = new HashMap<>(originalDataSourceMap);
+        if (!disabledDataSourceNames.isEmpty()) {
+            activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
+        }
+        List<String> memberDataSourceURLs = findMemberDataSourceURLs(activeDataSourceMap);
+        if (memberDataSourceURLs.isEmpty()) {
+            return;
+        }
+        Map<String, String> dataSourceURLs = new HashMap<>(16, 1);
+        determineDisabledDataSource(schemaName, activeDataSourceMap, memberDataSourceURLs, dataSourceURLs);
+        determineEnabledDataSource(originalDataSourceMap, schemaName, memberDataSourceURLs, dataSourceURLs);
+    }
+    
+    private List<String> findMemberDataSourceURLs(final Map<String, DataSource> activeDataSourceMap) {
+        List<String> result = new LinkedList<>();
+        try (Connection connection = activeDataSourceMap.get(oldPrimaryDataSource).getConnection();
+             Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(MEMBER_LIST);
+            while (resultSet.next()) {
+                if (!"ONLINE".equals(resultSet.getString("MEMBER_STATE"))) {
+                    continue;
+                }
+                result.add(String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")));
+            }
+        } catch (final SQLException ex) {
+            log.error("An exception occurred while find member data source urls", ex);
+        }
+        return result;
+    }
+    
+    private void determineDisabledDataSource(final String schemaName, final Map<String, DataSource> activeDataSourceMap,
+                                             final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
+        for (Entry<String, DataSource> entry : activeDataSourceMap.entrySet()) {
+            boolean disable = true;
+            String url = "";
+            try (Connection connection = entry.getValue().getConnection()) {
+                url = connection.getMetaData().getURL();
+                for (String each : memberDataSourceURLs) {
+                    if (url.contains(each)) {
+                        disable = false;
+                        break;
+                    }
+                }
+            } catch (final SQLException ex) {
+                log.error("An exception occurred while find data source urls", ex);
+            }
+            if (disable) {
+                ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), true));
+            } else if (!"".equals(url)) {
+                dataSourceURLs.put(entry.getKey(), url);
+            }
+        }
+    }
+    
+    private void determineEnabledDataSource(final Map<String, DataSource> originalDataSourceMap, final String schemaName,
+                                            final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
+        for (String each : memberDataSourceURLs) {
+            boolean enable = true;
+            for (Entry<String, String> entry : dataSourceURLs.entrySet()) {
+                if (entry.getValue().contains(each)) {
+                    enable = false;
+                    break;
+                }
+            }
+            if (!enable) {
+                continue;
+            }
+            for (Entry<String, DataSource> entry : originalDataSourceMap.entrySet()) {
+                String url;
+                try (Connection connection = entry.getValue().getConnection()) {
+                    url = connection.getMetaData().getURL();
+                    if (null != url && url.contains(each)) {
+                        ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), false));
+                        break;
+                    }
+                } catch (final SQLException ex) {
+                    log.error("An exception occurred while find enable data source urls", ex);
+                }
+            }
+        }
+    }
+    
+    @Override
+    public void startPeriodicalUpdate(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
         if (null == coordinatorRegistryCenter) {
             ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(props.getProperty("zkServerLists"), "mgr-elasticjob");
             coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
             coordinatorRegistryCenter.init();
         }
-        scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName),
+        scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, originalDataSourceMap, schemaName, disabledDataSourceNames),
                 JobConfiguration.newBuilder("MGRPeriodicalJob", 1).cron(props.getProperty("keepAliveCron")).build());
         scheduleJobBootstrap.schedule();
     }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
index 4318191..f3cee18 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
@@ -24,6 +24,8 @@ import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.ha.spi.HAType;
 
 import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 
 @RequiredArgsConstructor
@@ -32,14 +34,20 @@ public final class MGRPeriodicalJob implements SimpleJob {
     
     private final HAType haType;
     
-    private final Map<String, DataSource> dataSourceMap;
+    private final Map<String, DataSource> originalDataSourceMap;
     
     private final String schemaName;
     
+    private final Collection<String> disabledDataSourceNames;
+    
     @Override
     public void execute(final ShardingContext shardingContext) {
-        log.info("---------------MGRPeriodicalJob--------------");
-        log.info("dataSourceMap: " + dataSourceMap.toString());
-        haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+        Map<String, DataSource> activeDataSourceMap = new HashMap<>(originalDataSourceMap);
+        if (!disabledDataSourceNames.isEmpty()) {
+            activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
+        }
+        log.info(" +++ " + activeDataSourceMap.toString());
+        haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+        haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
     }
 }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 204454e..b69a196 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.route.fixture;
 import org.apache.shardingsphere.ha.spi.HAType;
 
 import javax.sql.DataSource;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -32,12 +33,15 @@ public final class TestRouteHATypeFixture implements HAType {
     }
     
     @Override
-    public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName) {
-
+    public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    }
+    
+    @Override
+    public void updateMemberState(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
     }
     
     @Override
-    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+    public void startPeriodicalUpdate(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
     }
     
     @Override
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 224379e..459abf0 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -18,10 +18,12 @@
 package org.apache.shardingsphere.governance.core.registry;
 
 import com.google.common.base.Strings;
+import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.governance.core.lock.node.LockNode;
 import org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
 
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +56,17 @@ public final class RegistryCenter {
     }
     
     /**
+     * Persist data source disabled state.
+     *
+     * @param event data source disabled event.
+     */
+    @Subscribe
+    public synchronized void renew(final DataSourceDisabledEvent event) {
+        String value = event.isDisabled() ? RegistryCenterNodeStatus.DISABLED.toString() : "";
+        repository.persist(node.getDataSourcePath(event.getSchemaName(), event.getDataSourceName()), value);
+    }
+    
+    /**
      * Persist instance online.
      */
     public void persistInstanceOnline() {
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
similarity index 55%
copy from shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
copy to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
index 4318191..dff7bed 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
@@ -15,31 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.ha.mgr;
+package org.apache.shardingsphere.infra.rule.event.impl;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.ha.spi.HAType;
-
-import javax.sql.DataSource;
-import java.util.Map;
+import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
 
+/**
+ * Data source disabled event.
+ */
 @RequiredArgsConstructor
-@Slf4j
-public final class MGRPeriodicalJob implements SimpleJob {
-    
-    private final HAType haType;
-    
-    private final Map<String, DataSource> dataSourceMap;
+@Getter
+public final class DataSourceDisabledEvent implements RuleChangedEvent {
     
     private final String schemaName;
     
-    @Override
-    public void execute(final ShardingContext shardingContext) {
-        log.info("---------------MGRPeriodicalJob--------------");
-        log.info("dataSourceMap: " + dataSourceMap.toString());
-        haType.updatePrimaryDataSource(dataSourceMap, schemaName);
-    }
+    private final String dataSourceName;
+    
+    private final boolean isDisabled;
 }