You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/07/05 10:19:21 UTC

[shardingsphere] branch master updated: Restart heartbeat job when primary data source has changed. (#18858)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 45145a16bdb Restart heartbeat job when primary data source has changed. (#18858)
45145a16bdb is described below

commit 45145a16bdbd9aa439f2c06992f90d91d89f8ad8
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Tue Jul 5 18:19:16 2022 +0800

    Restart heartbeat job when primary data source has changed. (#18858)
    
    * Restart heartbeat job when primary datasource has changed
    
    * restart heart beat job
    
    * Fix ci
    
    * Fix ci
    
    * Fix bug
    
    * Remove unless import
---
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java    | 77 +++++++++++-----------
 .../rule/ReadwriteSplittingRule.java               | 10 ++-
 .../identifier/type/RestartHeartBeatJobRule.java   | 35 ++++++++++
 .../ClusterContextManagerCoordinator.java          |  7 +-
 .../ClusterContextManagerCoordinatorTest.java      |  9 +--
 5 files changed, 87 insertions(+), 51 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 53544205984..52b29a6350f 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.dbdiscovery.rule;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import lombok.Getter;
 import org.apache.shardingsphere.dbdiscovery.algorithm.DatabaseDiscoveryEngine;
@@ -32,9 +33,11 @@ import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
 import org.apache.shardingsphere.infra.datasource.strategy.DynamicDataSourceStrategyFactory;
 import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
 import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
@@ -53,12 +56,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * Database discovery rule.
  */
-public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, StatusContainedRule, ExportableRule {
+public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, RestartHeartBeatJobRule, StatusContainedRule, ExportableRule {
     
     @Getter
     private final RuleConfiguration configuration;
@@ -79,8 +81,7 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
         discoveryTypes = getDiscoveryProviderAlgorithms(ruleConfig.getDiscoveryTypes());
         dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), ruleConfig.getDiscoveryHeartbeats());
         findPrimaryReplicaRelationship(databaseName, dataSourceMap);
-        initAware();
-        initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+        initAwareAndHeartBeatJobs(instanceContext);
     }
     
     public DatabaseDiscoveryRule(final String databaseName,
@@ -91,8 +92,7 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
         discoveryTypes = ruleConfig.getDiscoveryTypes();
         dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), ruleConfig.getDiscoveryHeartbeats());
         findPrimaryReplicaRelationship(databaseName, dataSourceMap);
-        initAware();
-        initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+        initAwareAndHeartBeatJobs(instanceContext);
     }
     
     private static Map<String, DatabaseDiscoveryProviderAlgorithm> getDiscoveryProviderAlgorithms(final Map<String, ShardingSphereAlgorithmConfiguration> discoveryTypesConfig) {
@@ -125,25 +125,6 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
         }
     }
     
-    private void initAware() {
-        DynamicDataSourceStrategyFactory.findInstance().ifPresent(optional -> optional.init(this));
-    }
-    
-    private void initHeartBeatJobs(final String instanceId) {
-        Optional<ModeScheduleContext> modeScheduleContext = ModeScheduleContextFactory.getInstance().get(instanceId);
-        if (modeScheduleContext.isPresent()) {
-            for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
-                DatabaseDiscoveryDataSourceRule rule = entry.getValue();
-                Map<String, DataSource> dataSources = dataSourceMap.entrySet().stream().filter(each -> !rule.getDisabledDataSourceNames().contains(each.getKey()))
-                        .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-                String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName();
-                CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), dataSources,
-                        rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames()).execute(null), rule.getHeartbeatProps().getProperty("keep-alive-cron"));
-                modeScheduleContext.get().startCronJob(job);
-            }
-        }
-    }
-    
     /**
      * Get single data source rule.
      *
@@ -173,21 +154,41 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
     }
     
     @Override
-    public void updateStatus(final DataSourceStatusChangedEvent event) {
-        if (event instanceof StorageNodeDataSourceChangedEvent) {
+    public void restart(final DataSourceStatusChangedEvent event, final InstanceContext instanceContext) {
+        PrimaryDataSourceChangedEvent dataSourceEvent = (PrimaryDataSourceChangedEvent) event;
+        QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();
+        DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());
+        Preconditions.checkState(null != dataSourceRule, "Can 't find database discovery data source rule in database `%s`.", databaseName);
+        dataSourceRule.changePrimaryDataSourceName(qualifiedDatabase.getDataSourceName());
+        initAwareAndHeartBeatJobs(instanceContext);
+    }
+    
+    private void initAwareAndHeartBeatJobs(final InstanceContext instanceContext) {
+        DynamicDataSourceStrategyFactory.findInstance().ifPresent(optional -> optional.init(this));
+        initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+    }
+    
+    private void initHeartBeatJobs(final String instanceId) {
+        Optional<ModeScheduleContext> modeScheduleContext = ModeScheduleContextFactory.getInstance().get(instanceId);
+        if (modeScheduleContext.isPresent()) {
             for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
-                StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
-                if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) {
-                    entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
-                } else {
-                    entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
-                }
+                DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+                String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName();
+                CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), dataSourceMap,
+                        rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames()).execute(null), rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+                modeScheduleContext.get().startCronJob(job);
             }
-        } else if (event instanceof PrimaryDataSourceChangedEvent) {
-            for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
-                if (entry.getValue().getGroupName().equals(((PrimaryDataSourceChangedEvent) event).getQualifiedDatabase().getGroupName())) {
-                    entry.getValue().changePrimaryDataSourceName(((PrimaryDataSourceChangedEvent) event).getQualifiedDatabase().getDataSourceName());
-                }
+        }
+    }
+    
+    @Override
+    public void updateStatus(final DataSourceStatusChangedEvent event) {
+        StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
+        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
+            if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) {
+                entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
+            } else {
+                entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
             }
         }
     }
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 80c7cb585c0..d6bc225026d 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -116,12 +116,10 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon
     
     @Override
     public void updateStatus(final DataSourceStatusChangedEvent event) {
-        if (event instanceof StorageNodeDataSourceChangedEvent) {
-            for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
-                StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
-                entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
-                        StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
-            }
+        for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
+            StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
+            entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
+                    StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
         }
     }
     
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
new file mode 100644
index 00000000000..eb7eba3bc2d
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule.identifier.type;
+
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+
+/**
+ * Restart heart beat job rule.
+ */
+public interface RestartHeartBeatJobRule extends ShardingSphereRule {
+    
+    /**
+     * Restart heart beat job.
+     * @param event data source status changed event
+     * @param instanceContext instance context
+     */
+    void restart(DataSourceStatusChangedEvent event, InstanceContext instanceContext);
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 79698ae9c0b..690ef0b76cd 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecu
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -192,9 +193,9 @@ public final class ClusterContextManagerCoordinator {
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
         contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
                 .stream()
-                .filter(each -> each instanceof StatusContainedRule)
-                .forEach(each -> ((StatusContainedRule) each)
-                        .updateStatus(new PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+                .filter(each -> each instanceof RestartHeartBeatJobRule)
+                .forEach(each -> ((RestartHeartBeatJobRule) each)
+                        .restart(new PrimaryDataSourceChangedEvent(qualifiedDatabase), contextManager.getInstanceContext()));
     }
     
     /**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 490463a275f..3648d1079e2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -44,8 +44,9 @@ import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
 import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
@@ -278,15 +279,15 @@ public final class ClusterContextManagerCoordinatorTest {
     @Test
     public void assertRenewPrimaryDataSourceName() {
         Collection<ShardingSphereRule> rules = new LinkedList<>();
-        StatusContainedRule mockStatusContainedRule = mock(StatusContainedRule.class);
-        rules.add(mockStatusContainedRule);
+        RestartHeartBeatJobRule mockRestartHeartBeatJobRule = mock(RestartHeartBeatJobRule.class);
+        rules.add(mockRestartHeartBeatJobRule);
         ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
         when(database.getRuleMetaData()).thenReturn(ruleMetaData);
         contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database);
         PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
         coordinator.renew(mockPrimaryStateChangedEvent);
-        verify(mockStatusContainedRule).updateStatus(any());
+        verify(mockRestartHeartBeatJobRule).restart(any(), any());
     }
     
     @Test