You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/07/05 10:19:21 UTC
[shardingsphere] branch master updated: Restart heartbeat job when primary data source has changed. (#18858)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 45145a16bdb Restart heartbeat job when primary data source has changed. (#18858)
45145a16bdb is described below
commit 45145a16bdbd9aa439f2c06992f90d91d89f8ad8
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Tue Jul 5 18:19:16 2022 +0800
Restart heartbeat job when primary data source has changed. (#18858)
* Restart heartbeat job when primary datasource has changed
* restart heart beat job
* Fix ci
* Fix ci
* Fix bug
* Remove unless import
---
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 77 +++++++++++-----------
.../rule/ReadwriteSplittingRule.java | 10 ++-
.../identifier/type/RestartHeartBeatJobRule.java | 35 ++++++++++
.../ClusterContextManagerCoordinator.java | 7 +-
.../ClusterContextManagerCoordinatorTest.java | 9 +--
5 files changed, 87 insertions(+), 51 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 53544205984..52b29a6350f 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.dbdiscovery.rule;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.Getter;
import org.apache.shardingsphere.dbdiscovery.algorithm.DatabaseDiscoveryEngine;
@@ -32,9 +33,11 @@ import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
import org.apache.shardingsphere.infra.datasource.strategy.DynamicDataSourceStrategyFactory;
import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
@@ -53,12 +56,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
-import java.util.stream.Collectors;
/**
* Database discovery rule.
*/
-public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, StatusContainedRule, ExportableRule {
+public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, RestartHeartBeatJobRule, StatusContainedRule, ExportableRule {
@Getter
private final RuleConfiguration configuration;
@@ -79,8 +81,7 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
discoveryTypes = getDiscoveryProviderAlgorithms(ruleConfig.getDiscoveryTypes());
dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), ruleConfig.getDiscoveryHeartbeats());
findPrimaryReplicaRelationship(databaseName, dataSourceMap);
- initAware();
- initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+ initAwareAndHeartBeatJobs(instanceContext);
}
public DatabaseDiscoveryRule(final String databaseName,
@@ -91,8 +92,7 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
discoveryTypes = ruleConfig.getDiscoveryTypes();
dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), ruleConfig.getDiscoveryHeartbeats());
findPrimaryReplicaRelationship(databaseName, dataSourceMap);
- initAware();
- initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+ initAwareAndHeartBeatJobs(instanceContext);
}
private static Map<String, DatabaseDiscoveryProviderAlgorithm> getDiscoveryProviderAlgorithms(final Map<String, ShardingSphereAlgorithmConfiguration> discoveryTypesConfig) {
@@ -125,25 +125,6 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
}
}
- private void initAware() {
- DynamicDataSourceStrategyFactory.findInstance().ifPresent(optional -> optional.init(this));
- }
-
- private void initHeartBeatJobs(final String instanceId) {
- Optional<ModeScheduleContext> modeScheduleContext = ModeScheduleContextFactory.getInstance().get(instanceId);
- if (modeScheduleContext.isPresent()) {
- for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
- DatabaseDiscoveryDataSourceRule rule = entry.getValue();
- Map<String, DataSource> dataSources = dataSourceMap.entrySet().stream().filter(each -> !rule.getDisabledDataSourceNames().contains(each.getKey()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName();
- CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), dataSources,
- rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames()).execute(null), rule.getHeartbeatProps().getProperty("keep-alive-cron"));
- modeScheduleContext.get().startCronJob(job);
- }
- }
- }
-
/**
* Get single data source rule.
*
@@ -173,21 +154,41 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont
}
@Override
- public void updateStatus(final DataSourceStatusChangedEvent event) {
- if (event instanceof StorageNodeDataSourceChangedEvent) {
+ public void restart(final DataSourceStatusChangedEvent event, final InstanceContext instanceContext) {
+ PrimaryDataSourceChangedEvent dataSourceEvent = (PrimaryDataSourceChangedEvent) event;
+ QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();
+ DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());
+ Preconditions.checkState(null != dataSourceRule, "Can 't find database discovery data source rule in database `%s`.", databaseName);
+ dataSourceRule.changePrimaryDataSourceName(qualifiedDatabase.getDataSourceName());
+ initAwareAndHeartBeatJobs(instanceContext);
+ }
+
+ private void initAwareAndHeartBeatJobs(final InstanceContext instanceContext) {
+ DynamicDataSourceStrategyFactory.findInstance().ifPresent(optional -> optional.init(this));
+ initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+ }
+
+ private void initHeartBeatJobs(final String instanceId) {
+ Optional<ModeScheduleContext> modeScheduleContext = ModeScheduleContextFactory.getInstance().get(instanceId);
+ if (modeScheduleContext.isPresent()) {
for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
- StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
- if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) {
- entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
- } else {
- entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
- }
+ DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+ String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName();
+ CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), dataSourceMap,
+ rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames()).execute(null), rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+ modeScheduleContext.get().startCronJob(job);
}
- } else if (event instanceof PrimaryDataSourceChangedEvent) {
- for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
- if (entry.getValue().getGroupName().equals(((PrimaryDataSourceChangedEvent) event).getQualifiedDatabase().getGroupName())) {
- entry.getValue().changePrimaryDataSourceName(((PrimaryDataSourceChangedEvent) event).getQualifiedDatabase().getDataSourceName());
- }
+ }
+ }
+
+ @Override
+ public void updateStatus(final DataSourceStatusChangedEvent event) {
+ StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
+ for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {
+ if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) {
+ entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
+ } else {
+ entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
}
}
}
diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 80c7cb585c0..d6bc225026d 100644
--- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -116,12 +116,10 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon
@Override
public void updateStatus(final DataSourceStatusChangedEvent event) {
- if (event instanceof StorageNodeDataSourceChangedEvent) {
- for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
- StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
- entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
- StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
- }
+ for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) {
+ StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;
+ entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
+ StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
new file mode 100644
index 00000000000..eb7eba3bc2d
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule.identifier.type;
+
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+
+/**
+ * Restart heart beat job rule.
+ */
+public interface RestartHeartBeatJobRule extends ShardingSphereRule {
+
+ /**
+ * Restart heart beat job.
+ * @param event data source status changed event
+ * @param instanceContext instance context
+ */
+ void restart(DataSourceStatusChangedEvent event, InstanceContext instanceContext);
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 79698ae9c0b..690ef0b76cd 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecu
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -192,9 +193,9 @@ public final class ClusterContextManagerCoordinator {
QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
.stream()
- .filter(each -> each instanceof StatusContainedRule)
- .forEach(each -> ((StatusContainedRule) each)
- .updateStatus(new PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+ .filter(each -> each instanceof RestartHeartBeatJobRule)
+ .forEach(each -> ((RestartHeartBeatJobRule) each)
+ .restart(new PrimaryDataSourceChangedEvent(qualifiedDatabase), contextManager.getInstanceContext()));
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 490463a275f..3648d1079e2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -44,8 +44,9 @@ import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
@@ -278,15 +279,15 @@ public final class ClusterContextManagerCoordinatorTest {
@Test
public void assertRenewPrimaryDataSourceName() {
Collection<ShardingSphereRule> rules = new LinkedList<>();
- StatusContainedRule mockStatusContainedRule = mock(StatusContainedRule.class);
- rules.add(mockStatusContainedRule);
+ RestartHeartBeatJobRule mockRestartHeartBeatJobRule = mock(RestartHeartBeatJobRule.class);
+ rules.add(mockRestartHeartBeatJobRule);
ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules);
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
when(database.getRuleMetaData()).thenReturn(ruleMetaData);
contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database);
PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
coordinator.renew(mockPrimaryStateChangedEvent);
- verify(mockStatusContainedRule).updateStatus(any());
+ verify(mockRestartHeartBeatJobRule).restart(any(), any());
}
@Test