You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2021/01/07 13:32:46 UTC

[shardingsphere] branch master updated: #7318, add primary data source state (#8941)

This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 46c9a4e  #7318, add primary data source state (#8941)
46c9a4e is described below

commit 46c9a4e34e2dec74266dc2cfe5cee9d178624369
Author: Zhang Yonglun <zh...@apache.org>
AuthorDate: Thu Jan 7 21:32:19 2021 +0800

    #7318, add primary data source state (#8941)
    
    * #7318, add primary data source state
    
    * #7318, add primary data source state
---
 .../org/apache/shardingsphere/ha/spi/HAType.java   |  8 ++-
 .../shardingsphere/ha/rule/HADataSourceRule.java   | 13 +++++
 .../org/apache/shardingsphere/ha/rule/HARule.java  | 19 +++++--
 .../ha/fixture/TestHATypeFixture.java              |  6 ++-
 .../apache/shardingsphere/ha/mgr/MGRHAType.java    | 28 ++++++----
 .../shardingsphere/ha/mgr/MGRPeriodicalJob.java    |  8 ++-
 .../ha/route/fixture/TestRouteHATypeFixture.java   |  6 ++-
 .../metadata/GovernanceMetaDataContexts.java       | 18 +++++++
 .../governance/core/facade/GovernanceFacade.java   |  1 +
 .../governance/core/registry/RegistryCenter.java   | 20 +++++++-
 .../core/registry/RegistryCenterNode.java          | 59 ++++++++++++++++++++++
 .../registry/event/PrimaryStateChangedEvent.java   | 35 +++++++++++++
 .../listener/DataSourceStateChangedListener.java   |  4 ++
 .../core/registry/RegistryCenterNodeTest.java      |  4 +-
 .../rule/event/impl/PrimaryDataSourceEvent.java    | 36 +++++++++++++
 15 files changed, 240 insertions(+), 25 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index 9ebf43c..eb3a341 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -44,8 +44,10 @@ public interface HAType extends TypedSPI {
      * @param dataSourceMap data source map
      * @param schemaName schema name
      * @param disabledDataSourceNames disabled data source names
+     * @param primaryDataSourceName primary data source name
+     * @param groupName group name
      */
-    void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
+    void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames, String groupName, String primaryDataSourceName);
     
     /**
      * Update member state.
@@ -62,8 +64,10 @@ public interface HAType extends TypedSPI {
      * @param dataSourceMap data source map
      * @param schemaName schema name
      * @param disabledDataSourceNames disabled data source names
+     * @param primaryDataSourceName primary data source name
+     * @param groupName group name
      */
-    void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames);
+    void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String schemaName, Collection<String> disabledDataSourceNames, String groupName, String primaryDataSourceName);
     
     /**
      * Stop periodical update.
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
index 2796324..9bffceb 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.rule;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.ha.api.config.rule.HADataSourceRuleConfiguration;
 import org.apache.shardingsphere.ha.spi.ReplicaLoadBalanceAlgorithm;
 
@@ -35,6 +36,7 @@ import java.util.stream.Collectors;
  * HA data source rule.
  */
 @Getter
+@Slf4j
 public final class HADataSourceRule {
     
     private final String name;
@@ -47,6 +49,8 @@ public final class HADataSourceRule {
     
     private final Collection<String> disabledDataSourceNames = new HashSet<>();
     
+    private String primaryDataSourceName;
+    
     public HADataSourceRule(final HADataSourceRuleConfiguration config, final ReplicaLoadBalanceAlgorithm loadBalancer) {
         checkConfiguration(config);
         name = config.getName();
@@ -84,6 +88,15 @@ public final class HADataSourceRule {
     }
     
     /**
+     * Update primary data source name.
+     *
+     * @param dataSourceName data source name
+     */
+    public void updatePrimaryDataSourceName(final String dataSourceName) {
+        primaryDataSourceName = dataSourceName;
+    }
+    
+    /**
      * Get data source mapper.
      *
      * @return data source mapper
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index af46490..f9f9d04 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.ha.spi.HAType;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
 import org.apache.shardingsphere.infra.rule.type.DataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.type.StatusContainedRule;
 import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
@@ -75,16 +76,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
         }
         Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
         Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
+        String groupName = dataSourceRules.values().iterator().next().getName();
+        String primaryDataSourceName = dataSourceRules.values().iterator().next().getPrimaryDataSourceName();
         if (null == haType) {
             haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaConfiguration().getType(), config.getHaConfiguration().getProps());
-            haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+            haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
             haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
         } else {
             haType.stopPeriodicalUpdate();
         }
         try {
             haType.checkHAConfig(dataSourceMap, schemaName);
-            haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
+            haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         }
@@ -104,16 +107,18 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
         }
         Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap);
         Collection<String> disabledDataSourceNames = dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
+        String groupName = config.getDataSources().iterator().next().getName();
+        String primaryDataSourceName = dataSourceRules.values().iterator().next().getPrimaryDataSourceName();
         if (null == haType) {
             haType = TypedSPIRegistry.getRegisteredService(HAType.class, config.getHaType().getType(), config.getHaType().getProps());
-            haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames);
+            haType.updatePrimaryDataSource(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
             haType.updateMemberState(originalDataSourceMap, schemaName, disabledDataSourceNames);
         } else {
             haType.stopPeriodicalUpdate();
         }
         try {
             haType.checkHAConfig(dataSourceMap, schemaName);
-            haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames);
+            haType.startPeriodicalUpdate(originalDataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         }
@@ -162,6 +167,12 @@ public final class HARule implements DataSourceContainedRule, StatusContainedRul
             for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
                 entry.getValue().updateDisabledDataSourceNames(((DataSourceNameDisabledEvent) event).getDataSourceName(), ((DataSourceNameDisabledEvent) event).isDisabled());
             }
+        } else if (event instanceof PrimaryDataSourceEvent) {
+            for (Entry<String, HADataSourceRule> entry : dataSourceRules.entrySet()) {
+                if (entry.getValue().getName().equals(((PrimaryDataSourceEvent) event).getGroupName())) {
+                    entry.getValue().updatePrimaryDataSourceName(((PrimaryDataSourceEvent) event).getDataSourceName());
+                }
+            }
         }
     }
 }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index d545494..7eb0565 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -33,7 +33,8 @@ public final class TestHATypeFixture implements HAType {
     }
     
     @Override
-    public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+                                        final String groupName, final String primaryDataSourceName) {
     }
     
     @Override
@@ -41,7 +42,8 @@ public final class TestHATypeFixture implements HAType {
     }
     
     @Override
-    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+                                      final String groupName, final String primaryDataSourceName) {
     }
     
     @Override
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index fdb5bdc..9d64169 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.ha.spi.HAType;
 import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -123,20 +124,24 @@ public final class MGRHAType implements HAType {
     }
     
     @Override
-    public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    public void updatePrimaryDataSource(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+                                        final String groupName, final String primaryDataSourceName) {
         Map<String, DataSource> activeDataSourceMap = new HashMap<>(dataSourceMap);
         if (!disabledDataSourceNames.isEmpty()) {
             activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
         }
-        String newPrimaryDataSource = determinePrimaryDataSource(activeDataSourceMap);
-        if (newPrimaryDataSource.isEmpty()) {
-            return;
+        if (null == primaryDataSourceName || primaryDataSourceName.equals(oldPrimaryDataSource)) {
+            String newPrimaryDataSource = determinePrimaryDataSource(activeDataSourceMap);
+            if (newPrimaryDataSource.isEmpty()) {
+                return;
+            }
+            if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
+                oldPrimaryDataSource = newPrimaryDataSource;
+                ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceEvent(schemaName, groupName, newPrimaryDataSource));
+            }
+        } else {
+            oldPrimaryDataSource = primaryDataSourceName;
         }
-        // TODO post primary datasource event
-//        if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
-//             ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceUpdateEvent(schemaName, newPrimaryDataSource, newPrimaryDataSource));
-//        }
-        oldPrimaryDataSource = newPrimaryDataSource;
     }
     
     private String determinePrimaryDataSource(final Map<String, DataSource> dataSourceMap) {
@@ -263,13 +268,14 @@ public final class MGRHAType implements HAType {
     }
     
     @Override
-    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+                                      final String groupName, final String primaryDataSourceName) {
         if (null == coordinatorRegistryCenter) {
             ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(props.getProperty("zkServerLists"), "mgr-elasticjob");
             coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
             coordinatorRegistryCenter.init();
         }
-        scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames),
+        scheduleJobBootstrap = new ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this, dataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName),
                 JobConfiguration.newBuilder("MGRPeriodicalJob", 1).cron(props.getProperty("keepAliveCron")).build());
         scheduleJobBootstrap.schedule();
     }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
index 701e4b4..606bff0 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
@@ -40,14 +40,18 @@ public final class MGRPeriodicalJob implements SimpleJob {
     
     private final Collection<String> disabledDataSourceNames;
     
+    private final String groupName;
+    
+    private final String primaryDataSourceName;
+    
     @Override
     public void execute(final ShardingContext shardingContext) {
         Map<String, DataSource> activeDataSourceMap = new HashMap<>(dataSourceMap);
         if (!disabledDataSourceNames.isEmpty()) {
             activeDataSourceMap.entrySet().removeIf(each -> disabledDataSourceNames.contains(each.getKey()));
         }
-        log.info(" +++ " + activeDataSourceMap.toString());
-        haType.updatePrimaryDataSource(dataSourceMap, schemaName, disabledDataSourceNames);
+        log.info("|activeDataSourceMap| " + activeDataSourceMap.toString());
+        haType.updatePrimaryDataSource(dataSourceMap, schemaName, disabledDataSourceNames, groupName, primaryDataSourceName);
         haType.updateMemberState(dataSourceMap, schemaName, disabledDataSourceNames);
     }
 }
diff --git a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 7a28f50..685cbdd 100644
--- a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++ b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -33,7 +33,8 @@ public final class TestRouteHATypeFixture implements HAType {
     }
     
     @Override
-    public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    public void updatePrimaryDataSource(final Map<String, DataSource> activeDataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+                                        final String groupName, final String primaryDataSourceName) {
     }
     
     @Override
@@ -41,7 +42,8 @@ public final class TestRouteHATypeFixture implements HAType {
     }
     
     @Override
-    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames) {
+    public void startPeriodicalUpdate(final Map<String, DataSource> dataSourceMap, final String schemaName, final Collection<String> disabledDataSourceNames,
+                                      final String groupName, final String primaryDataSourceName) {
     }
     
     @Override
diff --git a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
index 2af684c..97e0bde 100644
--- a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
+++ b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/metadata/GovernanceMetaDataContexts.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurat
 import org.apache.shardingsphere.governance.core.event.model.schema.SchemaChangedEvent;
 import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
 import org.apache.shardingsphere.governance.core.registry.event.DisabledStateChangedEvent;
+import org.apache.shardingsphere.governance.core.registry.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.governance.core.registry.schema.GovernanceSchema;
 import org.apache.shardingsphere.infra.auth.Authentication;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
@@ -45,6 +46,7 @@ import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.optimize.context.CalciteContextFactory;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
 import org.apache.shardingsphere.infra.rule.type.StatusContainedRule;
 import org.apache.shardingsphere.infra.state.StateContext;
 import org.apache.shardingsphere.infra.state.StateEvent;
@@ -260,6 +262,22 @@ public final class GovernanceMetaDataContexts implements MetaDataContexts {
         }
     }
     
+    /**
+     * Renew primary data source names.
+     *
+     * @param event primary state changed event
+     */
+    @Subscribe
+    public synchronized void renew(final PrimaryStateChangedEvent event) {
+        GovernanceSchema governanceSchema = event.getGovernanceSchema();
+        Collection<ShardingSphereRule> rules = metaDataContexts.getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
+        for (ShardingSphereRule each : rules) {
+            if (each instanceof StatusContainedRule) {
+                ((StatusContainedRule) each).updateRuleStatus(new PrimaryDataSourceEvent(governanceSchema.getSchemaName(), governanceSchema.getDataSourceName(), event.getPrimaryDataSourceName()));
+            }
+        }
+    }
+    
     private ShardingSphereMetaData createAddedMetaData(final MetaDataAddedEvent event) throws SQLException {
         String schemaName = event.getSchemaName();
         Map<String, Map<String, DataSource>> dataSourcesMap = createDataSourcesMap(Collections.singletonMap(schemaName, 
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
index 5116f1b..fba1dc3 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
@@ -87,6 +87,7 @@ public final class GovernanceFacade implements AutoCloseable {
     public void onlineInstance() {
         registryCenter.persistInstanceOnline();
         registryCenter.persistDataNodes();
+        registryCenter.persistPrimaryNodes();
         listenerManager.init();
     }
     
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 459abf0..88d2102 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.governance.core.registry.instance.GovernanceIns
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
 
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
@@ -58,7 +59,7 @@ public final class RegistryCenter {
     /**
      * Persist data source disabled state.
      *
-     * @param event data source disabled event.
+     * @param event data source disabled event
      */
     @Subscribe
     public synchronized void renew(final DataSourceDisabledEvent event) {
@@ -67,6 +68,16 @@ public final class RegistryCenter {
     }
     
     /**
+     * Persist primary data source state.
+     *
+     * @param event primary data source event
+     */
+    @Subscribe
+    public synchronized void renew(final PrimaryDataSourceEvent event) {
+        repository.persist(node.getPrimaryDataSourcePath(event.getSchemaName(), event.getGroupName()), event.getDataSourceName());
+    }
+    
+    /**
      * Persist instance online.
      */
     public void persistInstanceOnline() {
@@ -81,6 +92,13 @@ public final class RegistryCenter {
     }
     
     /**
+     * Initialize primary nodes.
+     */
+    public void persistPrimaryNodes() {
+        repository.persist(node.getPrimaryNodesPath(), "");
+    }
+    
+    /**
      * Persist instance data.
      * 
      * @param instanceData instance data
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
index 48a4850..88a1c5b 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
@@ -39,6 +39,8 @@ public final class RegistryCenterNode {
     
     private static final String DATA_NODES_NAME = "datanodes";
     
+    private static final String PRIMARY_NODES_NAME = "primarynodes";
+    
     /**
      * Get proxy node path.
      *
@@ -59,6 +61,15 @@ public final class RegistryCenterNode {
     }
     
     /**
+     * Get primary nodes path.
+     *
+     * @return primary nodes path
+     */
+    public String getPrimaryNodesPath() {
+        return Joiner.on("/").join("", ROOT, PRIMARY_NODES_NAME);
+    }
+    
+    /**
      * Get schema path.
      * 
      * @param schemaName schema name
@@ -69,6 +80,16 @@ public final class RegistryCenterNode {
     }
     
     /**
+     * Get primary nodes schema path.
+     *
+     * @param schemaName schema name
+     * @return schema path
+     */
+    public String getPrimaryNodesSchemaPath(final String schemaName) {
+        return Joiner.on("/").join("", ROOT, PRIMARY_NODES_NAME, schemaName);
+    }
+    
+    /**
      * Get data source path.
      * 
      * @param schemaName schema name
@@ -80,6 +101,17 @@ public final class RegistryCenterNode {
     }
     
     /**
+     * Get primary data source path.
+     *
+     * @param schemaName schema name
+     * @param groupName group name
+     * @return data source path
+     */
+    public String getPrimaryDataSourcePath(final String schemaName, final String groupName) {
+        return Joiner.on("/").join("", ROOT, PRIMARY_NODES_NAME, schemaName, groupName);
+    }
+    
+    /**
      * Get governance schema.
      *
      * @param dataSourceNodeFullPath data source node full path
@@ -91,6 +123,32 @@ public final class RegistryCenterNode {
         return matcher.find() ? Optional.of(new GovernanceSchema(matcher.group(1), matcher.group(2))) : Optional.empty();
     }
     
+    
+    /**
+     * Get primary nodes governance schema.
+     *
+     * @param dataSourceNodeFullPath data source node full path
+     * @return primary nodes governance schema
+     */
+    public Optional<GovernanceSchema> getPrimaryNodesGovernanceSchema(final String dataSourceNodeFullPath) {
+        Pattern pattern = Pattern.compile(getPrimaryNodesPath() + "/" + "(\\w+)/(\\w+)$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(dataSourceNodeFullPath);
+        return matcher.find() ? Optional.of(new GovernanceSchema(matcher.group(1), matcher.group(2))) : Optional.empty();
+    }
+    
+    
+    /**
+     * Is primary data source path.
+     *
+     * @param dataSourceNodeFullPath data source node full path
+     * @return is primary data source path
+     */
+    public boolean isPrimaryDataSourcePath(final String dataSourceNodeFullPath) {
+        Pattern pattern = Pattern.compile(getPrimaryNodesPath() + "/" + "(\\w+)/(\\w+)$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(dataSourceNodeFullPath);
+        return matcher.find();
+    }
+    
     /**
      * Get all schema path.
      * 
@@ -101,6 +159,7 @@ public final class RegistryCenterNode {
         Collection<String> result = new ArrayList<>(schemaNames.size());
         for (String schemaName : schemaNames) {
             result.add(getSchemaPath(schemaName));
+            result.add(getPrimaryNodesSchemaPath(schemaName));
         }
         return result;
     }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/event/PrimaryStateChangedEvent.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/event/PrimaryStateChangedEvent.java
new file mode 100644
index 0000000..85c3697
--- /dev/null
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/event/PrimaryStateChangedEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.registry.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
+import org.apache.shardingsphere.governance.core.registry.schema.GovernanceSchema;
+
+/**
+ * Primary state event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class PrimaryStateChangedEvent implements GovernanceEvent {
+    
+    private final GovernanceSchema governanceSchema;
+    
+    private final String primaryDataSourceName;
+}
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java
index a9a0f6e..36811bf 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/listener/DataSourceStateChangedListener.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
 import org.apache.shardingsphere.governance.core.registry.RegistryCenterNode;
 import org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
 import org.apache.shardingsphere.governance.core.registry.event.DisabledStateChangedEvent;
+import org.apache.shardingsphere.governance.core.registry.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
 import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.Type;
@@ -43,6 +44,9 @@ public final class DataSourceStateChangedListener extends PostGovernanceReposito
     
     @Override
     protected Optional<GovernanceEvent> createEvent(final DataChangedEvent event) {
+        if (registryCenterNode.isPrimaryDataSourcePath(event.getKey())) {
+            return registryCenterNode.getPrimaryNodesGovernanceSchema(event.getKey()).map(schema -> new PrimaryStateChangedEvent(schema, event.getValue()));
+        }
         return registryCenterNode.getGovernanceSchema(event.getKey()).map(schema -> new DisabledStateChangedEvent(schema, isDataSourceDisabled(event)));
     }
     
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java
index 4237a2b..e954e05 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeTest.java
@@ -58,8 +58,10 @@ public final class RegistryCenterNodeTest {
     @Test
     public void assertGetAllSchemaPaths() {
         Collection<String> schemaPaths = registryCenterNode.getAllSchemaPaths(Arrays.asList("replica_query_db", "sharding_db"));
-        assertThat(schemaPaths.size(), is(2));
+        assertThat(schemaPaths.size(), is(4));
         assertThat(schemaPaths, hasItem("/states/datanodes/replica_query_db"));
         assertThat(schemaPaths, hasItem("/states/datanodes/sharding_db"));
+        assertThat(schemaPaths, hasItem("/states/primarynodes/replica_query_db"));
+        assertThat(schemaPaths, hasItem("/states/primarynodes/sharding_db"));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/PrimaryDataSourceEvent.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/PrimaryDataSourceEvent.java
new file mode 100644
index 0000000..f7ee5f2
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/PrimaryDataSourceEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule.event.impl;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
+
+/**
+ * Primary data source event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class PrimaryDataSourceEvent implements RuleChangedEvent {
+    
+    private final String schemaName;
+    
+    private final String groupName;
+    
+    private final String dataSourceName;
+}