You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/06 14:17:56 UTC
[shardingsphere] branch master updated: #7318,
add data source disable and enable state (#8921)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8fde3fb #7318, add data source disable and enable state (#8921)
8fde3fb is described below
commit 8fde3fb95783576198cc62f73a160cb281746178
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Wed Jan 6 22:17:17 2021 +0800
#7318, add data source disable and enable state (#8921)
* #7318, add data source disable and enable state
* #7318, add data source disable and enable state
---
.../org/apache/shardingsphere/ha/spi/HAType.java | 20 +++-
.../shardingsphere/ha/rule/HADataSourceRule.java | 5 +-
.../org/apache/shardingsphere/ha/rule/HARule.java | 14 ++-
.../ha/fixture/TestHATypeFixture.java | 9 +-
.../apache/shardingsphere/ha/mgr/MGRHAType.java | 111 +++++++++++++++++++--
.../shardingsphere/ha/mgr/MGRPeriodicalJob.java | 16 ++-
.../ha/route/fixture/TestRouteHATypeFixture.java | 10 +-
.../governance/core/registry/RegistryCenter.java | 13 +++
.../rule/event/impl/DataSourceDisabledEvent.java | 31 ++----
9 files changed, 182 insertions(+), 47 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index a5fdca0..c5f6c61 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.spi.typed.TypedSPI;
import javax.sql.DataSource;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.Map;
/**
@@ -40,18 +41,29 @@ public interface HAType extends TypedSPI {
/**
* Update primary data source.
*
- * @param dataSourceMap data source map
+ * @param originalDataSourceMap original data source map
* @param schemaName schema name
+ * @param disabledDataSourceNames disabled data source names
*/
- void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String schemaName);
+ void updatePrimaryDataSource(Map<String, DataSource> originalDataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
+
+ /**
+ * Update member state.
+ *
+ * @param originalDataSourceMap original data source map
+ * @param schemaName schema name
+ * @param disabledDataSourceNames disabled data source names
+ */
+ void updateMemberState(Map<String, DataSource> originalDataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
/**
* Start periodical update.
*
- * @param dataSourceMap data source map
+ * @param originalDataSourceMap original data source map
* @param schemaName schema name
+ * @param disabledDataSourceNames disabled data source names
*/
- void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName);
+ void startPeriodicalUpdate(Map<String, DataSource> originalDataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
/**
* Stop periodical update.
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
index 5075a56..2796324 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.ha.rule;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.ha.api.config.rule.HADataSourceRuleConfiguration;
import org.apache.shardingsphere.ha.spi.ReplicaLoadBalanceAlgorithm;
@@ -46,7 +45,6 @@ public final class HADataSourceRule {
private final boolean replicaQuery;
- @Getter(AccessLevel.NONE)
private final Collection<String> disabledDataSourceNames = new HashSet<>();
public HADataSourceRule(final HADataSourceRuleConfiguration config, final ReplicaLoadBalanceAlgorithm loadBalancer) {
@@ -92,8 +90,7 @@ public final class HADataSourceRule {
*/
public Map<String, Collection<String>> getDataSourceMapper() {
Map<String, Collection<String>> result = new HashMap<>(1, 1);
- Collection<String> actualDataSourceNames = new LinkedList<>();
- actualDataSourceNames.addAll(dataSourceNames);
+ Collection<String> actualDataSourceNames = new LinkedList<>(dataSourceNames);
result.put(name, actualDataSourceNames);
return result;
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index c5c21d7..af46490 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -73,15 +73,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new HADataSourceRule(each, loadBalanceAlgorithm));
}
+ Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
+ Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
if (null == haType) {
haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaConfiguration().getType(), config.getHaConfiguration().getProps());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
} else {
haType.stopPeriodicalUpdate();
}
try {
haType.checkHAConfig(dataSourceMap, schemaName);
- haType.startPeriodicalUpdate(dataSourceMap, schemaName);
+ haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
@@ -99,15 +102,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new HADataSourceRule(each, loadBalanceAlgorithm));
}
+ Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
+ Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
if (null == haType) {
haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaType().getType(), config.getHaType().getProps());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
} else {
haType.stopPeriodicalUpdate();
}
try {
haType.checkHAConfig(dataSourceMap, schemaName);
- haType.startPeriodicalUpdate(dataSourceMap, schemaName);
+ haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index 1bd200a..1641fe8 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.fixture;
import org.apache.shardingsphere.ha.spi.HAType;
import javax.sql.DataSource;
+import java.util.Collection;
import java.util.Map;
/**
@@ -32,11 +33,15 @@ public final class TestHATypeFixture implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+ public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+ public void updateMemberState(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ }
+
+ @Override
+ public void startPeriodicalUpdate(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
}
@Override
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index 6065d2e..b1a2aeb 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -27,12 +27,18 @@ import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.ha.spi.HAType;
import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
@@ -51,6 +57,8 @@ public final class MGRHAType implements HAType {
private static final String SINGLE_PRIMARY = "SELECT * FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'";
+ private static final String MEMBER_LIST = "SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members";
+
private static CoordinatorRegistryCenter coordinatorRegistryCenter;
private ScheduleJobBootstrap scheduleJobBootstrap;
@@ -115,11 +123,19 @@ public final class MGRHAType implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName) {
- String newPrimaryDataSource = determinePrimaryDataSource(dataSourceMap);
+ public void updatePrimaryDataSource(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ Map<String, DataSource> activeDataSourceMap = new HashMap<>(originalDataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
+ }
+ String newPrimaryDataSource = determinePrimaryDataSource(activeDataSourceMap);
if (newPrimaryDataSource.isEmpty()) {
return;
}
+ // TODO post primary datasource event
+// if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
+// ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceUpdateEvent(schemaName, newPrimaryDataSource, newPrimaryDataSource));
+// }
oldPrimaryDataSource = newPrimaryDataSource;
}
@@ -149,8 +165,7 @@ public final class MGRHAType implements HAType {
private String findPrimaryDataSourceName(final String primaryDataSourceURL, final Map<String, DataSource> dataSourceMap) {
String result = "";
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
- DataSource dataSource = entry.getValue();
- try (Connection connection = dataSource.getConnection()) {
+ try (Connection connection = entry.getValue().getConnection()) {
if (connection.getMetaData().getURL().contains(primaryDataSourceURL)) {
return entry.getKey();
}
@@ -162,13 +177,97 @@ public final class MGRHAType implements HAType {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+ public void updateMemberState(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ Map<String, DataSource> activeDataSourceMap = new HashMap<>(originalDataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
+ }
+ List<String> memberDataSourceURLs = findMemberDataSourceURLs(activeDataSourceMap);
+ if (memberDataSourceURLs.isEmpty()) {
+ return;
+ }
+ Map<String, String> dataSourceURLs = new HashMap<>(16, 1);
+ determineDisabledDataSource(schemaName, activeDataSourceMap, memberDataSourceURLs, dataSourceURLs);
+ determineEnabledDataSource(originalDataSourceMap, schemaName, memberDataSourceURLs, dataSourceURLs);
+ }
+
+ private List<String> findMemberDataSourceURLs(final Map<String, DataSource> activeDataSourceMap) {
+ List<String> result = new LinkedList<>();
+ try (Connection connection = activeDataSourceMap.get(oldPrimaryDataSource).getConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(MEMBER_LIST);
+ while (resultSet.next()) {
+ if (!"ONLINE".equals(resultSet.getString("MEMBER_STATE"))) {
+ continue;
+ }
+ result.add(String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")));
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find member data source urls", ex);
+ }
+ return result;
+ }
+
+ private void determineDisabledDataSource(final String schemaName, final Map<String, DataSource> activeDataSourceMap,
+ final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
+ for (Entry<String, DataSource> entry : activeDataSourceMap.entrySet()) {
+ boolean disable = true;
+ String url = "";
+ try (Connection connection = entry.getValue().getConnection()) {
+ url = connection.getMetaData().getURL();
+ for (String each : memberDataSourceURLs) {
+ if (url.contains(each)) {
+ disable = false;
+ break;
+ }
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find data source urls", ex);
+ }
+ if (disable) {
+ ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), true));
+ } else if (!"".equals(url)) {
+ dataSourceURLs.put(entry.getKey(), url);
+ }
+ }
+ }
+
+ private void determineEnabledDataSource(final Map<String, DataSource> originalDataSourceMap, final String schemaName,
+ final List<String> memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
+ for (String each : memberDataSourceURLs) {
+ boolean enable = true;
+ for (Entry<String, String> entry : dataSourceURLs.entrySet()) {
+ if (entry.getValue().contains(each)) {
+ enable = false;
+ break;
+ }
+ }
+ if (!enable) {
+ continue;
+ }
+ for (Entry<String, DataSource> entry : originalDataSourceMap.entrySet()) {
+ String url;
+ try (Connection connection = entry.getValue().getConnection()) {
+ url = connection.getMetaData().getURL();
+ if (null != url && url.contains(each)) {
+ ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(schemaName, entry.getKey(), false));
+ break;
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find enable data source urls", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void startPeriodicalUpdate(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
if (null == coordinatorRegistryCenter) {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(props.getProperty("zkServerLists"), "mgr-elasticjob");
coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
coordinatorRegistryCenter.init();
}
- scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName),
+ scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, originalDataSourceMap, schemaName, disabledDataSourceNames),
JobConfiguration.newBuilder("MGRPeriodicalJob", 1).cron(props.getProperty("keepAliveCron")).build());
scheduleJobBootstrap.schedule();
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
index 4318191..f3cee18 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
@@ -24,6 +24,8 @@ import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.ha.spi.HAType;
import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@@ -32,14 +34,20 @@ public final class MGRPeriodicalJob implements SimpleJob {
private final HAType haType;
- private final Map<String, DataSource> dataSourceMap;
+ private final Map<String, DataSource> originalDataSourceMap;
private final String schemaName;
+ private final Collection<String> disabledDataSourceNames;
+
@Override
public void execute(final ShardingContext shardingContext) {
- log.info("---------------MGRPeriodicalJob--------------");
- log.info("dataSourceMap: " + dataSourceMap.toString());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+ Map<String, DataSource> activeDataSourceMap = new HashMap<>(originalDataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
+ }
+ log.info(" +++ " + activeDataSourceMap.toString());
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
}
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 204454e..b69a196 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.route.fixture;
import org.apache.shardingsphere.ha.spi.HAType;
import javax.sql.DataSource;
+import java.util.Collection;
import java.util.Map;
/**
@@ -32,12 +33,15 @@ public final class TestRouteHATypeFixture implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName) {
-
+ public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ }
+
+ @Override
+ public void updateMemberState(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName) {
+ public void startPeriodicalUpdate(final Map<String, DataSource> originalDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
}
@Override
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 224379e..459abf0 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -18,10 +18,12 @@
package org.apache.shardingsphere.governance.core.registry;
import com.google.common.base.Strings;
+import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.governance.core.lock.node.LockNode;
import org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,17 @@ public final class RegistryCenter {
}
/**
+ * Persist data source disabled state.
+ *
+ * @param event data source disabled event.
+ */
+ @Subscribe
+ public synchronized void renew(final DataSourceDisabledEvent event) {
+ String value = event.isDisabled() ? RegistryCenterNodeStatus.DISABLED.toString() : "";
+ repository.persist(node.getDataSourcePath(event.getSchemaName(), event.getDataSourceName()), value);
+ }
+
+ /**
* Persist instance online.
*/
public void persistInstanceOnline() {
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
similarity index 55%
copy from shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
copy to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
index 4318191..dff7bed 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
@@ -15,31 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.ha.mgr;
+package org.apache.shardingsphere.infra.rule.event.impl;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.ha.spi.HAType;
-
-import javax.sql.DataSource;
-import java.util.Map;
+import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
+/**
+ * Data source disabled event.
+ */
@RequiredArgsConstructor
-@Slf4j
-public final class MGRPeriodicalJob implements SimpleJob {
-
- private final HAType haType;
-
- private final Map<String, DataSource> dataSourceMap;
+@Getter
+public final class DataSourceDisabledEvent implements RuleChangedEvent {
private final String schemaName;
- @Override
- public void execute(final ShardingContext shardingContext) {
- log.info("---------------MGRPeriodicalJob--------------");
- log.info("dataSourceMap: " + dataSourceMap.toString());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
- }
+ private final String dataSourceName;
+
+ private final boolean isDisabled;
}