You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2021/01/07 13:32:46 UTC
[shardingsphere] branch master updated: #7318,
add primary data source state (#8941)
This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 46c9a4e #7318, add primary data source state (#8941)
46c9a4e is described below
commit 46c9a4e34e2dec74266dc2cfe5cee9d178624369
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Thu Jan 7 21:32:19 2021 +0800
#7318, add primary data source state (#8941)
* #7318, add primary data source state
* #7318, add primary data source state
---
.../org/apache/shardingsphere/ha/spi/HAType.java | 8 ++-
.../shardingsphere/ha/rule/HADataSourceRule.java | 13 +++++
.../org/apache/shardingsphere/ha/rule/HARule.java | 19 +++++--
.../ha/fixture/TestHATypeFixture.java | 6 ++-
.../apache/shardingsphere/ha/mgr/MGRHAType.java | 28 ++++++----
.../shardingsphere/ha/mgr/MGRPeriodicalJob.java | 8 ++-
.../ha/route/fixture/TestRouteHATypeFixture.java | 6 ++-
.../metadata/GovernanceMetaDataContexts.java | 18 +++++++
.../governance/core/facade/GovernanceFacade.java | 1 +
.../governance/core/registry/RegistryCenter.java | 20 +++++++-
.../core/registry/RegistryCenterNode.java | 59 ++++++++++++++++++++++
.../registry/event/PrimaryStateChangedEvent.java | 35 +++++++++++++
.../listener/DataSourceStateChangedListener.java | 4 ++
.../core/registry/RegistryCenterNodeTest.java | 4 +-
.../rule/event/impl/PrimaryDataSourceEvent.java | 36 +++++++++++++
15 files changed, 240 insertions(+), 25 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index 9ebf43c..eb3a341 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -44,8 +44,10 @@ public interface HAType extends TypedSPI {
* @param dataSourceMap data source map
* @param schemaName schema name
* @param disabledDataSourceNames disabled data source names
+ * @param primaryDataSourceName primary data source name
+ * @param groupName group name
*/
- void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
+ void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames, String groupName, String primaryDataSourceName);
/**
* Update member state.
@@ -62,8 +64,10 @@ public interface HAType extends TypedSPI {
* @param dataSourceMap data source map
* @param schemaName schema name
* @param disabledDataSourceNames disabled data source names
+ * @param primaryDataSourceName primary data source name
+ * @param groupName group name
*/
- void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
+ void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames, String groupName, String primaryDataSourceName);
/**
* Stop periodical update.
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
index 2796324..9bffceb 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.rule;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.ha.api.config.rule.HADataSourceRuleConfiguration;
import org.apache.shardingsphere.ha.spi.ReplicaLoadBalanceAlgorithm;
@@ -35,6 +36,7 @@ import java.util.stream.Collectors;
* HA data source rule.
*/
@Getter
+@Slf4j
public final class HADataSourceRule {
private final String name;
@@ -47,6 +49,8 @@ public final class HADataSourceRule {
private final Collection<String> disabledDataSourceNames = new HashSet<>();
+ private String primaryDataSourceName;
+
public HADataSourceRule(final HADataSourceRuleConfiguration config, final ReplicaLoadBalanceAlgorithm loadBalancer) {
checkConfiguration(config);
name = config.getName();
@@ -84,6 +88,15 @@ public final class HADataSourceRule {
}
/**
+ * Update primary data source name.
+ *
+ * @param dataSourceName data source name
+ */
+ public void updatePrimaryDataSourceName(final String dataSourceName) {
+ primaryDataSourceName = dataSourceName;
+ }
+
+ /**
* Get data source mapper.
*
* @return data source mapper
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index af46490..f9f9d04 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.ha.spi.HAType;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
import org.apache.shardingsphere.infra.rule.type.DataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.type.StatusContainedRule;
import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
@@ -75,16 +76,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
}
Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
+ String groupName = dataSourceRules.values().iterator().next().getName();
+ String primaryDataSourceName = dataSourceRules.values().iterator().next().getPrimaryDataSourceName();
if (null == haType) {
haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaConfiguration().getType(), config.getHaConfiguration().getProps());
- haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
} else {
haType.stopPeriodicalUpdate();
}
try {
haType.checkHAConfig(dataSourceMap, schemaName);
- haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
@@ -104,16 +107,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
}
Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
+ String groupName = config.getDataSources().iterator().next().getName();
+ String primaryDataSourceName = dataSourceRules.values().iterator().next().getPrimaryDataSourceName();
if (null == haType) {
haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaType().getType(), config.getHaType().getProps());
- haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
} else {
haType.stopPeriodicalUpdate();
}
try {
haType.checkHAConfig(dataSourceMap, schemaName);
- haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
+ haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
@@ -162,6 +167,12 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
entry.getValue().updateDisabledDataSourceNames(((DataSourceNameDisabledEvent) event).getDataSourceName(), ((DataSourceNameDisabledEvent) event).isDisabled());
}
+ } else if (event instanceof PrimaryDataSourceEvent) {
+ for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
+ if (entry.getValue().getName().equals(((PrimaryDataSourceEvent) event).getGroupName())) {
+ entry.getValue().updatePrimaryDataSourceName(((PrimaryDataSourceEvent) event).getDataSourceName());
+ }
+ }
}
}
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index d545494..7eb0565 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -33,7 +33,8 @@ public final class TestHATypeFixture implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+ final String groupName, final String primaryDataSourceName) {
}
@Override
@@ -41,7 +42,8 @@ public final class TestHATypeFixture implements HAType {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+ final String groupName, final String primaryDataSourceName) {
}
@Override
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index fdb5bdc..9d64169 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.ha.spi.HAType;
import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -123,20 +124,24 @@ public final class MGRHAType implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+ final String groupName, final String primaryDataSourceName) {
Map<String, DataSource> activeDataSourceMap = new HashMap<>(dataSourceMap);
if (!disabledDataSourceNames.isEmpty()) {
activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
}
- String newPrimaryDataSource = determinePrimaryDataSource(activeDataSourceMap);
- if (newPrimaryDataSource.isEmpty()) {
- return;
+ if (null == primaryDataSourceName || primaryDataSourceName.equals(oldPrimaryDataSource)) {
+ String newPrimaryDataSource = determinePrimaryDataSource(activeDataSourceMap);
+ if (newPrimaryDataSource.isEmpty()) {
+ return;
+ }
+ if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
+ oldPrimaryDataSource = newPrimaryDataSource;
+ ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceEvent(schemaName, groupName, newPrimaryDataSource));
+ }
+ } else {
+ oldPrimaryDataSource = primaryDataSourceName;
}
- // TODO post primary datasource event
-// if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
-// ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceUpdateEvent(schemaName, newPrimaryDataSource, newPrimaryDataSource));
-// }
- oldPrimaryDataSource = newPrimaryDataSource;
}
private String determinePrimaryDataSource(final Map<String, DataSource> dataSourceMap) {
@@ -263,13 +268,14 @@ public final class MGRHAType implements HAType {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+ final String groupName, final String primaryDataSourceName) {
if (null == coordinatorRegistryCenter) {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(props.getProperty("zkServerLists"), "mgr-elasticjob");
coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
coordinatorRegistryCenter.init();
}
- scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames),
+ scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName),
JobConfiguration.newBuilder("MGRPeriodicalJob", 1).cron(props.getProperty("keepAliveCron")).build());
scheduleJobBootstrap.schedule();
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
index 701e4b4..606bff0 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
@@ -40,14 +40,18 @@ public final class MGRPeriodicalJob implements SimpleJob {
private final Collection<String> disabledDataSourceNames;
+ private final String groupName;
+
+ private final String primaryDataSourceName;
+
@Override
public void execute(final ShardingContext shardingContext) {
Map<String, DataSource> activeDataSourceMap = new HashMap<>(dataSourceMap);
if (!disabledDataSourceNames.isEmpty()) {
activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
}
- log.info(" +++ " + activeDataSourceMap.toString());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName, disabledDataSourceNames);
+ log.info("|activeDataSourceMap| " + activeDataSourceMap.toString());
+ haType.updatePrimaryDataSource(dataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
haType.updateMemberState(dataSourceMap, schemaName, disabledDataSourceNames);
}
}
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 7a28f50..685cbdd 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -33,7 +33,8 @@ public final class TestRouteHATypeFixture implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+ final String groupName, final String primaryDataSourceName) {
}
@Override
@@ -41,7 +42,8 @@ public final class TestRouteHATypeFixture implements HAType {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+ public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+ final String groupName, final String primaryDataSourceName) {
}
@Override
diff --git a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
index 2af684c..97e0bde 100644
--- a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
+++ b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurat
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaChangedEvent;
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
import org.apache.shardingsphere.governance.core.registry.event.DisabledStateChangedEvent;
+import org.apache.shardingsphere.governance.core.registry.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.governance.core.registry.schema.GovernanceSchema;
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
@@ -45,6 +46,7 @@ import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.optimize.context.CalciteContextFactory;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
import org.apache.shardingsphere.infra.rule.type.StatusContainedRule;
import org.apache.shardingsphere.infra.state.StateContext;
import org.apache.shardingsphere.infra.state.StateEvent;
@@ -260,6 +262,22 @@ public final class GovernanceMetaDataContexts implements MetaDataContexts {
}
}
+ /**
+ * Renew primary data source names.
+ *
+ * @param event primary state changed event
+ */
+ @Subscribe
+ public synchronized void renew(final PrimaryStateChangedEvent event) {
+ GovernanceSchema governanceSchema = event.getGovernanceSchema();
+ Collection<ShardingSphereRule> rules = metaDataContexts.getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
+ for (ShardingSphereRule each : rules) {
+ if (each instanceof StatusContainedRule) {
+ ((StatusContainedRule) each).updateRuleStatus(new PrimaryDataSourceEvent(governanceSchema.getSchemaName(), governanceSchema.getDataSourceName(), event.getPrimaryDataSourceName()));
+ }
+ }
+ }
+
private ShardingSphereMetaData createAddedMetaData(final MetaDataAddedEvent event) throws SQLException {
String schemaName = event.getSchemaName();
Map<String, Map<String, DataSource>> dataSourcesMap = createDataSourcesMap(Collections.singletonMap(schemaName,
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
index 5116f1b..fba1dc3 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
@@ -87,6 +87,7 @@ public final class GovernanceFacade implements AutoCloseable {
public void onlineInstance() {
registryCenter.persistInstanceOnline();
registryCenter.persistDataNodes();
+ registryCenter.persistPrimaryNodes();
listenerManager.init();
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 459abf0..88d2102 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.governance.core.registry.instance.GovernanceIns
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@@ -58,7 +59,7 @@ public final class RegistryCenter {
/**
* Persist data source disabled state.
*
- * @param event data source disabled event.
+ * @param event data source disabled event
*/
@Subscribe
public synchronized void renew(final DataSourceDisabledEvent event) {
@@ -67,6 +68,16 @@ public final class RegistryCenter {
}
/**
+ * Persist primary data source state.
+ *
+ * @param event primary data source event
+ */
+ @Subscribe
+ public synchronized void renew(final PrimaryDataSourceEvent event) {
+ repository.persist(node.getPrimaryDataSourcePath(event.getSchemaName(), event.getGroupName()), event.getDataSourceName());
+ }
+
+ /**
* Persist instance online.
*/
public void persistInstanceOnline() {
@@ -81,6 +92,13 @@ public final class RegistryCenter {
}
/**
+ * Initialize primary nodes.
+ */
+ public void persistPrimaryNodes() {
+ repository.persist(node.getPrimaryNodesPath(), "");
+ }
+
+ /**
* Persist instance data.
*
* @param instanceData instance data
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
index 48a4850..88a1c5b 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
@@ -39,6 +39,8 @@ public final class RegistryCenterNode {
private static final String DATA_NODES_NAME = "datanodes";
+ private static final String PRIMARY_NODES_NAME = "primarynodes";
+
/**
* Get proxy node path.
*
@@ -59,6 +61,15 @@ public final class RegistryCenterNode {
}
/**
+ * Get primary nodes path.
+ *
+ * @return primary nodes path
+ */
+ public String getPrimaryNodesPath() {
+ return Joiner.on("/").join("", ROOT, PRIMARY_NODES_NAME);
+ }
+
+ /**
* Get schema path.
*
* @param schemaName schema name
@@ -69,6 +80,16 @@ public final class RegistryCenterNode {
}
/**
+ * Get primary nodes schema path.
+ *
+ * @param schemaName schema name
+ * @return schema path
+ */
+ public String getPrimaryNodesSchemaPath(final String schemaName) {
+ return Joiner.on("/").join("", ROOT, PRIMARY_NODES_NAME, schemaName);
+ }
+
+ /**
* Get data source path.
*
* @param schemaName schema name
@@ -80,6 +101,17 @@ public final class RegistryCenterNode {
}
/**
+ * Get primary data source path.
+ *
+ * @param schemaName schema name
+ * @param groupName group name
+ * @return data source path
+ */
+ public String getPrimaryDataSourcePath(final String schemaName, final String groupName) {
+ return Joiner.on("/").join("", ROOT, PRIMARY_NODES_NAME, schemaName, groupName);
+ }
+
+ /**
* Get governance schema.
*
* @param dataSourceNodeFullPath data source node full path
@@ -91,6 +123,32 @@ public final class RegistryCenterNode {
return matcher.find() ? Optional.of(new GovernanceSchema(matcher.group(1), matcher.group(2))) : Optional.empty();
}
+
+ /**
+ * Get primary nodes governance schema.
+ *
+ * @param dataSourceNodeFullPath data source node full path
+ * @return primary nodes governance schema
+ */
+ public Optional<GovernanceSchema> getPrimaryNodesGovernanceSchema(final String dataSourceNodeFullPath) {
+ Pattern pattern = Pattern.compile(getPrimaryNodesPath() + "/" + "(\\w+)/(\\w+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(dataSourceNodeFullPath);
+ return matcher.find() ? Optional.of(new GovernanceSchema(matcher.group(1), matcher.group(2))) : Optional.empty();
+ }
+
+
+ /**
+ * Is primary data source path.
+ *
+ * @param dataSourceNodeFullPath data source node full path
+ * @return is primary data source path
+ */
+ public boolean isPrimaryDataSourcePath(final String dataSourceNodeFullPath) {
+ Pattern pattern = Pattern.compile(getPrimaryNodesPath() + "/" + "(\\w+)/(\\w+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(dataSourceNodeFullPath);
+ return matcher.find();
+ }
+
/**
* Get all schema path.
*
@@ -101,6 +159,7 @@ public final class RegistryCenterNode {
Collection<String> result = new ArrayList<>(schemaNames.size());
for (String schemaName : schemaNames) {
result.add(getSchemaPath(schemaName));
+ result.add(getPrimaryNodesSchemaPath(schemaName));
}
return result;
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/event/PrimaryStateChangedEvent.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/event/PrimaryStateChangedEvent.java
new file mode 100644
index 0000000..85c3697
--- /dev/null
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/event/PrimaryStateChangedEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.registry.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
+import org.apache.shardingsphere.governance.core.registry.schema.GovernanceSchema;
+
+/**
+ * Primary state event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class PrimaryStateChangedEvent implements GovernanceEvent {
+
+ private final GovernanceSchema governanceSchema;
+
+ private final String primaryDataSourceName;
+}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java
index a9a0f6e..36811bf 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
import org.apache.shardingsphere.governance.core.registry.RegistryCenterNode;
import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
import org.apache.shardingsphere.governance.core.registry.event.DisabledStateChangedEvent;
+import org.apache.shardingsphere.governance.core.registry.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.Type;
@@ -43,6 +44,9 @@ public final class DataSourceStateChangedListener extends PostGovernanceReposito
@Override
protected Optional<GovernanceEvent> createEvent(final DataChangedEvent event) {
+ if (registryCenterNode.isPrimaryDataSourcePath(event.getKey())) {
+ return registryCenterNode.getPrimaryNodesGovernanceSchema(event.getKey()).map(schema -> new PrimaryStateChangedEvent(schema, event.getValue()));
+ }
return registryCenterNode.getGovernanceSchema(event.getKey()).map(schema -> new DisabledStateChangedEvent(schema, isDataSourceDisabled(event)));
}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java
index 4237a2b..e954e05 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java
@@ -58,8 +58,10 @@ public final class RegistryCenterNodeTest {
@Test
public void assertGetAllSchemaPaths() {
Collection<String> schemaPaths = registryCenterNode.getAllSchemaPaths(Arrays.asList("replica_query_db", "sharding_db"));
- assertThat(schemaPaths.size(), is(2));
+ assertThat(schemaPaths.size(), is(4));
assertThat(schemaPaths, hasItem("/states/datanodes/replica_query_db"));
assertThat(schemaPaths, hasItem("/states/datanodes/sharding_db"));
+ assertThat(schemaPaths, hasItem("/states/primarynodes/replica_query_db"));
+ assertThat(schemaPaths, hasItem("/states/primarynodes/sharding_db"));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/PrimaryDataSourceEvent.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/PrimaryDataSourceEvent.java
new file mode 100644
index 0000000..f7ee5f2
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/PrimaryDataSourceEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule.event.impl;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
+
+/**
+ * Primary data source event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class PrimaryDataSourceEvent implements RuleChangedEvent {
+
+ private final String schemaName;
+
+ private final String groupName;
+
+ private final String dataSourceName;
+}