You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/06/16 09:02:21 UTC
[shardingsphere] branch master updated: Dynamically update
configurations of cluster (#6062)
This is an automated email from the ASF dual-hosted git repository.
kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 55fff10 Dynamically update configurations of cluster (#6062)
55fff10 is described below
commit 55fff101aa89ff15d148f8e618e8432f294ff65e
Author: Haoran Meng <lo...@163.com>
AuthorDate: Tue Jun 16 17:02:04 2020 +0800
Dynamically update configurations of cluster (#6062)
* Dynamically update configurations of cluster
* Add unit tests
* revise comments
* Rename method
* Add final for new class
---
.../cluster/facade/ClusterFacade.java | 25 +++++++-
.../heartbeat/ClusterHeartbeatInstance.java | 8 +++
.../cluster/heartbeat/detect/HeartbeatHandler.java | 11 +++-
.../heartbeat/task/HeartbeatTaskManager.java | 9 +++
.../CuratorZookeeperCenterRepositoryTest.java | 4 +-
.../event/ClusterConfigurationChangedEvent.java | 32 ++++++++++
.../ClusterConfigurationChangedListener.java | 45 ++++++++++++++
.../ConfigurationChangedListenerManager.java | 4 ++
.../core/configcenter/ConfigCenterTest.java | 39 ++++++++++++
.../ClusterConfigurationChangedListenerTest.java | 70 ++++++++++++++++++++++
.../src/test/resources/yaml/data-cluster.yaml | 24 ++++++++
11 files changed, 265 insertions(+), 6 deletions(-)
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java
index c523c96..9477e17 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java
@@ -19,8 +19,7 @@ package org.apache.shardingsphere.cluster.facade;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
import org.apache.shardingsphere.cluster.heartbeat.ClusterHeartbeatInstance;
import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse;
@@ -30,6 +29,8 @@ import org.apache.shardingsphere.cluster.state.DataSourceState;
import org.apache.shardingsphere.cluster.state.InstanceState;
import org.apache.shardingsphere.cluster.state.enums.NodeState;
import org.apache.shardingsphere.kernel.context.SchemaContext;
+import org.apache.shardingsphere.orchestration.core.common.event.ClusterConfigurationChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.eventbus.ShardingOrchestrationEventBus;
import java.util.Collection;
import java.util.HashMap;
@@ -38,13 +39,16 @@ import java.util.Map;
/**
* Cluster facade.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ClusterFacade {
private ClusterHeartbeatInstance clusterHeartbeatInstance;
private ClusterStateInstance clusterStateInstance;
+ private ClusterFacade() {
+ ShardingOrchestrationEventBus.getInstance().register(this);
+ }
+
/**
* Init cluster facade.
*
@@ -76,6 +80,21 @@ public final class ClusterFacade {
reportHeartbeat(heartbeatResponse);
}
+ /**
+ * Renew cluster facade.
+ *
+ * @param event cluster configuration changed event
+ */
+ @Subscribe
+ public void renew(final ClusterConfigurationChangedEvent event) {
+ stop();
+ init(event.getClusterConfiguration());
+ }
+
+ private void stop() {
+ clusterHeartbeatInstance.close();
+ }
+
private InstanceState buildInstanceState(final HeartbeatResponse heartbeatResponse) {
InstanceState instanceState = clusterStateInstance.loadInstanceState();
return new InstanceState(instanceState.getState(), buildDataSourceStateMap(instanceState, heartbeatResponse));
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java
index 7f038cb..0b3f9e9 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java
@@ -70,6 +70,14 @@ public final class ClusterHeartbeatInstance {
return heartbeatHandler.handle(schemaContexts, ShardingOrchestrationFacade.getInstance().getRegistryCenter().loadAllDataSourcesNodes());
}
+ /**
+ * Close cluster heartbeat instance.
+ */
+ public void close() {
+ heartbeatTaskManager.close();
+ heartbeatHandler.close();
+ }
+
private static final class ClusterHeartbeatInstanceHolder {
private static final ClusterHeartbeatInstance INSTANCE = new ClusterHeartbeatInstance();
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java
index ac4c81a..3970a0b 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java
@@ -54,6 +54,8 @@ public final class HeartbeatHandler {
private Collection<String> disabledDataSources = Collections.emptyList();
+ private ExecutorService executorService;
+
/**
* Init heartbeat handler.
*
@@ -82,7 +84,7 @@ public final class HeartbeatHandler {
*/
public HeartbeatResponse handle(final Map<String, SchemaContext> schemaContexts, final Collection<String> disabledDataSources) {
this.disabledDataSources = disabledDataSources;
- ExecutorService executorService = Executors.newFixedThreadPool(configuration.getThreadCount());
+ executorService = Executors.newFixedThreadPool(configuration.getThreadCount());
List<Future<Map<String, HeartbeatResult>>> futureTasks = new ArrayList<>();
schemaContexts.forEach((key, value) -> value.getSchema().getDataSources().forEach((innerKey, innerValue) -> {
futureTasks.add(executorService.submit(new HeartbeatDetect(key, innerKey, innerValue, configuration, isDisabled(key, innerKey))));
@@ -92,6 +94,13 @@ public final class HeartbeatHandler {
return heartbeatResponse;
}
+ /**
+ * Close heartbeat handler.
+ */
+ public void close() {
+ closeExecutor(executorService);
+ }
+
private HeartbeatResponse buildHeartbeatResponse(final List<Future<Map<String, HeartbeatResult>>> futureTasks) {
Map<String, Collection<HeartbeatResult>> heartbeatResultMap = futureTasks.stream().map(e -> {
try {
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java
index 9a826cf..58809da 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java
@@ -45,4 +45,13 @@ public final class HeartbeatTaskManager {
Preconditions.checkNotNull(heartbeatTask, "task can not be null");
executorService.scheduleAtFixedRate(heartbeatTask, 0L, interval, TimeUnit.SECONDS);
}
+
+ /**
+ * Close heartbeat task manager.
+ */
+ public void close() {
+ if (null != executorService && !executorService.isShutdown()) {
+ executorService.shutdown();
+ }
+ }
}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperCenterRepositoryTest.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/ [...]
index 3ae00b1..d90c8db 100644
--- a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperCenterRepositoryTest.java
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperCenterRepositoryTest.java
@@ -136,7 +136,7 @@ public final class CuratorZookeeperCenterRepositoryTest {
props.setProperty(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS.getKey(), "1000");
props.setProperty(ZookeeperPropertyKey.MAX_RETRIES.getKey(), "1");
props.setProperty(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "1000");
- props.setProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "1000");
+ props.setProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "2000");
CenterConfiguration configuration = new CenterConfiguration(customCenterRepository.getType(), new Properties());
configuration.setServerLists(serverLists);
customCenterRepository.setProps(props);
@@ -144,7 +144,7 @@ public final class CuratorZookeeperCenterRepositoryTest {
assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS.getKey()), is("1000"));
assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.MAX_RETRIES.getKey()), is("1"));
assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey()), is("1000"));
- assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey()), is("1000"));
+ assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey()), is("2000"));
customCenterRepository.persist("/test/children/build/1", "value1");
assertThat(customCenterRepository.get("/test/children/build/1"), is("value1"));
}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-common/src/main/java/org/apache/shardingsphere/orchestration/core/common/event/ClusterConfigurationChangedEvent.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-common/src/main/java/org/apache/shardingsphere/orchestration/core/common/event/ClusterConfigurationChangedEvent.java
new file mode 100644
index 0000000..4195df9
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-common/src/main/java/org/apache/shardingsphere/orchestration/core/common/event/ClusterConfigurationChangedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.core.common.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
+
+/**
+ * Cluster configuration changed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ClusterConfigurationChangedEvent implements ShardingOrchestrationEvent {
+
+ private final ClusterConfiguration clusterConfiguration;
+}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListener.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listen [...]
new file mode 100644
index 0000000..9251925
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.core.configcenter.listener;
+
+import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper;
+import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.orchestration.center.ConfigCenterRepository;
+import org.apache.shardingsphere.orchestration.center.listener.DataChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.event.ClusterConfigurationChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.listener.PostShardingCenterRepositoryEventListener;
+import org.apache.shardingsphere.orchestration.core.configcenter.ConfigCenterNode;
+
+import java.util.Collections;
+
+/**
+ * Cluster configuration changed listener.
+ */
+public final class ClusterConfigurationChangedListener extends PostShardingCenterRepositoryEventListener {
+
+ public ClusterConfigurationChangedListener(final String name, final ConfigCenterRepository configCenterRepository) {
+ super(configCenterRepository, Collections.singletonList(new ConfigCenterNode(name).getClusterPath()));
+ }
+
+ @Override
+ protected ClusterConfigurationChangedEvent createShardingOrchestrationEvent(final DataChangedEvent event) {
+ return new ClusterConfigurationChangedEvent(new ClusterConfigurationYamlSwapper()
+ .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlClusterConfiguration.class)));
+ }
+}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ConfigurationChangedListenerManager.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listen [...]
index 9befa52..497ba0d 100644
--- a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ConfigurationChangedListenerManager.java
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ConfigurationChangedListenerManager.java
@@ -35,11 +35,14 @@ public final class ConfigurationChangedListenerManager {
private final MetricsConfigurationChangedListener metricsConfigurationChangedListener;
+ private final ClusterConfigurationChangedListener clusterConfigurationChangedListener;
+
public ConfigurationChangedListenerManager(final String name, final ConfigCenterRepository configCenterRepository, final Collection<String> shardingSchemaNames) {
schemaChangedListener = new SchemaChangedListener(name, configCenterRepository, shardingSchemaNames);
propertiesChangedListener = new PropertiesChangedListener(name, configCenterRepository);
authenticationChangedListener = new AuthenticationChangedListener(name, configCenterRepository);
metricsConfigurationChangedListener = new MetricsConfigurationChangedListener(name, configCenterRepository);
+ clusterConfigurationChangedListener = new ClusterConfigurationChangedListener(name, configCenterRepository);
}
/**
@@ -50,5 +53,6 @@ public final class ConfigurationChangedListenerManager {
propertiesChangedListener.watch(ChangedType.UPDATED);
authenticationChangedListener.watch(ChangedType.UPDATED);
metricsConfigurationChangedListener.watch(ChangedType.UPDATED);
+ clusterConfigurationChangedListener.watch(ChangedType.UPDATED);
}
}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java
index ddb2534..ceb7ab3 100644
--- a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java
@@ -17,7 +17,11 @@
package org.apache.shardingsphere.orchestration.core.configcenter;
+import lombok.SneakyThrows;
import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
+import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper;
+import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
import org.apache.shardingsphere.encrypt.api.config.EncryptRuleConfiguration;
import org.apache.shardingsphere.infra.auth.Authentication;
import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration;
@@ -43,6 +47,8 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -54,6 +60,8 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@@ -216,6 +224,8 @@ public final class ConfigCenterTest {
+ " - set names utf8mb4;\n"
+ " - set names utf8;\n";
+ private static final String DATA_CLUSTER_YAML = "yaml/data-cluster.yaml";
+
@Mock
private ConfigCenterRepository configCenterRepository;
@@ -573,6 +583,30 @@ public final class ConfigCenterTest {
assertDataSourceConfigurationWithConnectionInitSqls(actual.get("ds_1"), createDataSourceConfiguration(createDataSourceWithConnectionInitSqls("ds_1")));
}
+ @Test
+ public void assertPersistClusterConfiguration() {
+ ClusterConfiguration clusterConfiguration = new ClusterConfigurationYamlSwapper()
+ .swapToObject(YamlEngine.unmarshal(readYAML(DATA_CLUSTER_YAML), YamlClusterConfiguration.class));
+ ConfigCenter configurationService = new ConfigCenter("test", configCenterRepository);
+ configurationService.persistClusterConfiguration(clusterConfiguration, true);
+ verify(configCenterRepository, times(0)).persist(eq("/test/config/cluster"), eq(readYAML(DATA_CLUSTER_YAML)));
+ }
+
+ @Test
+ public void loadClusterConfiguration() {
+ when(configCenterRepository.get("/test/config/cluster")).thenReturn(readYAML(DATA_CLUSTER_YAML));
+ ConfigCenter configurationService = new ConfigCenter("test", configCenterRepository);
+ ClusterConfiguration clusterConfiguration = configurationService.loadClusterConfiguration();
+ assertNotNull(clusterConfiguration);
+ assertNotNull(clusterConfiguration.getHeartbeat());
+ assertThat(clusterConfiguration.getHeartbeat().getSql(), is("select 1"));
+ assertThat(clusterConfiguration.getHeartbeat().getThreadCount(), is(1));
+ assertThat(clusterConfiguration.getHeartbeat().getInterval(), is(60));
+ assertFalse(clusterConfiguration.getHeartbeat().getRetryEnable());
+ assertThat(clusterConfiguration.getHeartbeat().getRetryMaximum(), is(3));
+ assertThat(clusterConfiguration.getHeartbeat().getRetryInterval(), is(3));
+ }
+
private DataSource createDataSourceWithConnectionInitSqls(final String name) {
BasicDataSource result = new BasicDataSource();
result.setDriverClassName("com.mysql.jdbc.Driver");
@@ -590,4 +624,9 @@ public final class ConfigCenterTest {
assertThat(actual.getProps().get("password"), is(expected.getProps().get("password")));
assertThat(actual.getProps().get("connectionInitSqls"), is(expected.getProps().get("connectionInitSqls")));
}
+
+ @SneakyThrows
+ private String readYAML(final String yamlFile) {
+ return Files.readAllLines(Paths.get(ClassLoader.getSystemResource(yamlFile).toURI())).stream().map(each -> each + System.lineSeparator()).collect(Collectors.joining());
+ }
}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListenerTest.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/li [...]
new file mode 100644
index 0000000..da040e1
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListenerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.core.configcenter.listener;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.orchestration.center.ConfigCenterRepository;
+import org.apache.shardingsphere.orchestration.center.listener.DataChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.event.ClusterConfigurationChangedEvent;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public final class ClusterConfigurationChangedListenerTest {
+
+ private static final String DATA_CLUSTER_YAML = "yaml/data-cluster.yaml";
+
+ private ClusterConfigurationChangedListener clusterConfigurationChangedListener;
+
+ @Mock
+ private ConfigCenterRepository configCenterRepository;
+
+ @Before
+ public void setUp() {
+ clusterConfigurationChangedListener = new ClusterConfigurationChangedListener("test", configCenterRepository);
+ }
+
+ @Test
+ public void assertCreateShardingOrchestrationEvent() {
+ ClusterConfigurationChangedEvent event = clusterConfigurationChangedListener
+ .createShardingOrchestrationEvent(new DataChangedEvent("test", readYAML(DATA_CLUSTER_YAML), DataChangedEvent.ChangedType.UPDATED));
+ assertNotNull(event);
+ assertNotNull(event.getClusterConfiguration());
+ assertNotNull(event.getClusterConfiguration().getHeartbeat());
+ assertThat(event.getClusterConfiguration().getHeartbeat().getSql(), is("select 1"));
+ assertThat(event.getClusterConfiguration().getHeartbeat().getThreadCount(), is(1));
+ assertThat(event.getClusterConfiguration().getHeartbeat().getInterval(), is(60));
+ assertFalse(event.getClusterConfiguration().getHeartbeat().getRetryEnable());
+ assertThat(event.getClusterConfiguration().getHeartbeat().getRetryMaximum(), is(3));
+ assertThat(event.getClusterConfiguration().getHeartbeat().getRetryInterval(), is(3));
+ }
+
+ @SneakyThrows
+ private String readYAML(final String yamlFile) {
+ return Files.readAllLines(Paths.get(ClassLoader.getSystemResource(yamlFile).toURI())).stream().map(each -> each + System.lineSeparator()).collect(Collectors.joining());
+ }
+}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/resources/yaml/data-cluster.yaml b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/resources/yaml/data-cluster.yaml
new file mode 100644
index 0000000..7fb1e3f
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/resources/yaml/data-cluster.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+heartbeat:
+ sql: select 1
+ threadCount: 1
+ interval: 60
+ retryEnable: false
+ retryMaximum: 3
+ retryInterval: 3