You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/06/16 09:02:21 UTC

[shardingsphere] branch master updated: Dynamically update configurations of cluster (#6062)

This is an automated email from the ASF dual-hosted git repository.

kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 55fff10  Dynamically update configurations of cluster (#6062)
55fff10 is described below

commit 55fff101aa89ff15d148f8e618e8432f294ff65e
Author: Haoran Meng <lo...@163.com>
AuthorDate: Tue Jun 16 17:02:04 2020 +0800

    Dynamically update configurations of cluster (#6062)
    
    * Dynamically update configurations of cluster
    
    * Add unit tests
    
    * revise comments
    
    * Rename method
    
    * Add final for new class
---
 .../cluster/facade/ClusterFacade.java              | 25 +++++++-
 .../heartbeat/ClusterHeartbeatInstance.java        |  8 +++
 .../cluster/heartbeat/detect/HeartbeatHandler.java | 11 +++-
 .../heartbeat/task/HeartbeatTaskManager.java       |  9 +++
 .../CuratorZookeeperCenterRepositoryTest.java      |  4 +-
 .../event/ClusterConfigurationChangedEvent.java    | 32 ++++++++++
 .../ClusterConfigurationChangedListener.java       | 45 ++++++++++++++
 .../ConfigurationChangedListenerManager.java       |  4 ++
 .../core/configcenter/ConfigCenterTest.java        | 39 ++++++++++++
 .../ClusterConfigurationChangedListenerTest.java   | 70 ++++++++++++++++++++++
 .../src/test/resources/yaml/data-cluster.yaml      | 24 ++++++++
 11 files changed, 265 insertions(+), 6 deletions(-)

diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java
index c523c96..9477e17 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-facade/src/main/java/org/apache/shardingsphere/cluster/facade/ClusterFacade.java
@@ -19,8 +19,7 @@ package org.apache.shardingsphere.cluster.facade;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
 import org.apache.shardingsphere.cluster.heartbeat.ClusterHeartbeatInstance;
 import org.apache.shardingsphere.cluster.heartbeat.response.HeartbeatResponse;
@@ -30,6 +29,8 @@ import org.apache.shardingsphere.cluster.state.DataSourceState;
 import org.apache.shardingsphere.cluster.state.InstanceState;
 import org.apache.shardingsphere.cluster.state.enums.NodeState;
 import org.apache.shardingsphere.kernel.context.SchemaContext;
+import org.apache.shardingsphere.orchestration.core.common.event.ClusterConfigurationChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.eventbus.ShardingOrchestrationEventBus;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -38,13 +39,16 @@ import java.util.Map;
 /**
  * Cluster facade.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class ClusterFacade {
     
     private ClusterHeartbeatInstance clusterHeartbeatInstance;
     
     private ClusterStateInstance clusterStateInstance;
     
+    private ClusterFacade() {
+        ShardingOrchestrationEventBus.getInstance().register(this);
+    }
+    
     /**
      * Init cluster facade.
      *
@@ -76,6 +80,21 @@ public final class ClusterFacade {
         reportHeartbeat(heartbeatResponse);
     }
     
+    /**
+     * Renew cluster facade.
+     *
+     * @param event cluster configuration changed event
+     */
+    @Subscribe
+    public void renew(final ClusterConfigurationChangedEvent event) {
+        stop();
+        init(event.getClusterConfiguration());
+    }
+    
+    private void stop() {
+        clusterHeartbeatInstance.close();
+    }
+    
     private InstanceState buildInstanceState(final HeartbeatResponse heartbeatResponse) {
         InstanceState instanceState = clusterStateInstance.loadInstanceState();
         return new InstanceState(instanceState.getState(), buildDataSourceStateMap(instanceState, heartbeatResponse));
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java
index 7f038cb..0b3f9e9 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/ClusterHeartbeatInstance.java
@@ -70,6 +70,14 @@ public final class ClusterHeartbeatInstance {
         return heartbeatHandler.handle(schemaContexts, ShardingOrchestrationFacade.getInstance().getRegistryCenter().loadAllDataSourcesNodes());
     }
     
+    /**
+     * Close cluster heartbeat instance.
+     */
+    public void close() {
+        heartbeatTaskManager.close();
+        heartbeatHandler.close();
+    }
+    
     private static final class ClusterHeartbeatInstanceHolder {
         
         private static final ClusterHeartbeatInstance INSTANCE = new ClusterHeartbeatInstance();
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java
index ac4c81a..3970a0b 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/detect/HeartbeatHandler.java
@@ -54,6 +54,8 @@ public final class HeartbeatHandler {
     
     private Collection<String> disabledDataSources = Collections.emptyList();
     
+    private ExecutorService executorService;
+    
     /**
      * Init heartbeat handler.
      *
@@ -82,7 +84,7 @@ public final class HeartbeatHandler {
      */
     public HeartbeatResponse handle(final Map<String, SchemaContext> schemaContexts, final Collection<String> disabledDataSources) {
         this.disabledDataSources = disabledDataSources;
-        ExecutorService executorService = Executors.newFixedThreadPool(configuration.getThreadCount());
+        executorService = Executors.newFixedThreadPool(configuration.getThreadCount());
         List<Future<Map<String, HeartbeatResult>>> futureTasks = new ArrayList<>();
         schemaContexts.forEach((key, value) -> value.getSchema().getDataSources().forEach((innerKey, innerValue) -> {
             futureTasks.add(executorService.submit(new HeartbeatDetect(key, innerKey, innerValue, configuration, isDisabled(key, innerKey))));
@@ -92,6 +94,13 @@ public final class HeartbeatHandler {
         return heartbeatResponse;
     }
     
+    /**
+     * Close heartbeat handler.
+     */
+    public void close() {
+        closeExecutor(executorService);
+    }
+    
     private HeartbeatResponse buildHeartbeatResponse(final List<Future<Map<String, HeartbeatResult>>> futureTasks) {
         Map<String, Collection<HeartbeatResult>> heartbeatResultMap = futureTasks.stream().map(e -> {
             try {
diff --git a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java
index 9a826cf..58809da 100644
--- a/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java
+++ b/shardingsphere-control-panel/shardingsphere-cluster/shardingsphere-cluster-heartbeat/src/main/java/org/apache/shardingsphere/cluster/heartbeat/task/HeartbeatTaskManager.java
@@ -45,4 +45,13 @@ public final class HeartbeatTaskManager {
         Preconditions.checkNotNull(heartbeatTask, "task can not be null");
         executorService.scheduleAtFixedRate(heartbeatTask, 0L, interval, TimeUnit.SECONDS);
     }
+    
+    /**
+     * Close heartbeat task manager.
+     */
+    public void close() {
+        if (null != executorService && !executorService.isShutdown()) {
+            executorService.shutdown();
+        }
+    }
 }
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperCenterRepositoryTest.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/ [...]
index 3ae00b1..d90c8db 100644
--- a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperCenterRepositoryTest.java
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-center/shardingsphere-orchestration-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperCenterRepositoryTest.java
@@ -136,7 +136,7 @@ public final class CuratorZookeeperCenterRepositoryTest {
         props.setProperty(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS.getKey(), "1000");
         props.setProperty(ZookeeperPropertyKey.MAX_RETRIES.getKey(), "1");
         props.setProperty(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "1000");
-        props.setProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "1000");
+        props.setProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "2000");
         CenterConfiguration configuration = new CenterConfiguration(customCenterRepository.getType(), new Properties());
         configuration.setServerLists(serverLists);
         customCenterRepository.setProps(props);
@@ -144,7 +144,7 @@ public final class CuratorZookeeperCenterRepositoryTest {
         assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS.getKey()), is("1000"));
         assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.MAX_RETRIES.getKey()), is("1"));
         assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey()), is("1000"));
-        assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey()), is("1000"));
+        assertThat(customCenterRepository.getProps().getProperty(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey()), is("2000"));
         customCenterRepository.persist("/test/children/build/1", "value1");
         assertThat(customCenterRepository.get("/test/children/build/1"), is("value1"));
     }
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-common/src/main/java/org/apache/shardingsphere/orchestration/core/common/event/ClusterConfigurationChangedEvent.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-common/src/main/java/org/apache/shardingsphere/orchestration/core/common/event/ClusterConfigurationChangedEvent.java
new file mode 100644
index 0000000..4195df9
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-common/src/main/java/org/apache/shardingsphere/orchestration/core/common/event/ClusterConfigurationChangedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.core.common.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
+
+/**
+ * Cluster configuration changed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ClusterConfigurationChangedEvent implements ShardingOrchestrationEvent {
+    
+    private final ClusterConfiguration clusterConfiguration;
+}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListener.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listen [...]
new file mode 100644
index 0000000..9251925
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.core.configcenter.listener;
+
+import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper;
+import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.orchestration.center.ConfigCenterRepository;
+import org.apache.shardingsphere.orchestration.center.listener.DataChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.event.ClusterConfigurationChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.listener.PostShardingCenterRepositoryEventListener;
+import org.apache.shardingsphere.orchestration.core.configcenter.ConfigCenterNode;
+
+import java.util.Collections;
+
+/**
+ * Cluster configuration changed listener.
+ */
+public final class ClusterConfigurationChangedListener extends PostShardingCenterRepositoryEventListener {
+    
+    public ClusterConfigurationChangedListener(final String name, final ConfigCenterRepository configCenterRepository) {
+        super(configCenterRepository, Collections.singletonList(new ConfigCenterNode(name).getClusterPath()));
+    }
+    
+    @Override
+    protected ClusterConfigurationChangedEvent createShardingOrchestrationEvent(final DataChangedEvent event) {
+        return new ClusterConfigurationChangedEvent(new ClusterConfigurationYamlSwapper()
+                .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlClusterConfiguration.class)));
+    }
+}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ConfigurationChangedListenerManager.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listen [...]
index 9befa52..497ba0d 100644
--- a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ConfigurationChangedListenerManager.java
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/main/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ConfigurationChangedListenerManager.java
@@ -35,11 +35,14 @@ public final class ConfigurationChangedListenerManager {
     
     private final MetricsConfigurationChangedListener metricsConfigurationChangedListener;
     
+    private final ClusterConfigurationChangedListener clusterConfigurationChangedListener;
+    
     public ConfigurationChangedListenerManager(final String name, final ConfigCenterRepository configCenterRepository, final Collection<String> shardingSchemaNames) {
         schemaChangedListener = new SchemaChangedListener(name, configCenterRepository, shardingSchemaNames);
         propertiesChangedListener = new PropertiesChangedListener(name, configCenterRepository);
         authenticationChangedListener = new AuthenticationChangedListener(name, configCenterRepository);
         metricsConfigurationChangedListener = new MetricsConfigurationChangedListener(name, configCenterRepository);
+        clusterConfigurationChangedListener = new ClusterConfigurationChangedListener(name, configCenterRepository);
     }
     
     /**
@@ -50,5 +53,6 @@ public final class ConfigurationChangedListenerManager {
         propertiesChangedListener.watch(ChangedType.UPDATED);
         authenticationChangedListener.watch(ChangedType.UPDATED);
         metricsConfigurationChangedListener.watch(ChangedType.UPDATED);
+        clusterConfigurationChangedListener.watch(ChangedType.UPDATED);
     }
 }
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java
index ddb2534..ceb7ab3 100644
--- a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/ConfigCenterTest.java
@@ -17,7 +17,11 @@
 
 package org.apache.shardingsphere.orchestration.core.configcenter;
 
+import lombok.SneakyThrows;
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.shardingsphere.cluster.configuration.config.ClusterConfiguration;
+import org.apache.shardingsphere.cluster.configuration.swapper.ClusterConfigurationYamlSwapper;
+import org.apache.shardingsphere.cluster.configuration.yaml.YamlClusterConfiguration;
 import org.apache.shardingsphere.encrypt.api.config.EncryptRuleConfiguration;
 import org.apache.shardingsphere.infra.auth.Authentication;
 import org.apache.shardingsphere.infra.auth.yaml.config.YamlAuthenticationConfiguration;
@@ -43,6 +47,8 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -54,6 +60,8 @@ import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
@@ -216,6 +224,8 @@ public final class ConfigCenterTest {
             + "        - set names utf8mb4;\n"
             + "        - set names utf8;\n";
     
+    private static final String DATA_CLUSTER_YAML = "yaml/data-cluster.yaml";
+    
     @Mock
     private ConfigCenterRepository configCenterRepository;
     
@@ -573,6 +583,30 @@ public final class ConfigCenterTest {
         assertDataSourceConfigurationWithConnectionInitSqls(actual.get("ds_1"), createDataSourceConfiguration(createDataSourceWithConnectionInitSqls("ds_1")));
     }
     
+    @Test
+    public void assertPersistClusterConfiguration() {
+        ClusterConfiguration clusterConfiguration = new ClusterConfigurationYamlSwapper()
+                .swapToObject(YamlEngine.unmarshal(readYAML(DATA_CLUSTER_YAML), YamlClusterConfiguration.class));
+        ConfigCenter configurationService = new ConfigCenter("test", configCenterRepository);
+        configurationService.persistClusterConfiguration(clusterConfiguration, true);
+        verify(configCenterRepository, times(0)).persist(eq("/test/config/cluster"), eq(readYAML(DATA_CLUSTER_YAML)));
+    }
+    
+    @Test
+    public void loadClusterConfiguration() {
+        when(configCenterRepository.get("/test/config/cluster")).thenReturn(readYAML(DATA_CLUSTER_YAML));
+        ConfigCenter configurationService = new ConfigCenter("test", configCenterRepository);
+        ClusterConfiguration clusterConfiguration = configurationService.loadClusterConfiguration();
+        assertNotNull(clusterConfiguration);
+        assertNotNull(clusterConfiguration.getHeartbeat());
+        assertThat(clusterConfiguration.getHeartbeat().getSql(), is("select 1"));
+        assertThat(clusterConfiguration.getHeartbeat().getThreadCount(), is(1));
+        assertThat(clusterConfiguration.getHeartbeat().getInterval(), is(60));
+        assertFalse(clusterConfiguration.getHeartbeat().getRetryEnable());
+        assertThat(clusterConfiguration.getHeartbeat().getRetryMaximum(), is(3));
+        assertThat(clusterConfiguration.getHeartbeat().getRetryInterval(), is(3));
+    }
+    
     private DataSource createDataSourceWithConnectionInitSqls(final String name) {
         BasicDataSource result = new BasicDataSource();
         result.setDriverClassName("com.mysql.jdbc.Driver");
@@ -590,4 +624,9 @@ public final class ConfigCenterTest {
         assertThat(actual.getProps().get("password"), is(expected.getProps().get("password")));
         assertThat(actual.getProps().get("connectionInitSqls"), is(expected.getProps().get("connectionInitSqls")));
     }
+    
+    @SneakyThrows
+    private String readYAML(final String yamlFile) {
+        return Files.readAllLines(Paths.get(ClassLoader.getSystemResource(yamlFile).toURI())).stream().map(each -> each + System.lineSeparator()).collect(Collectors.joining());
+    }
 }
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListenerTest.java b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/li [...]
new file mode 100644
index 0000000..da040e1
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/java/org/apache/shardingsphere/orchestration/core/configcenter/listener/ClusterConfigurationChangedListenerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.core.configcenter.listener;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.orchestration.center.ConfigCenterRepository;
+import org.apache.shardingsphere.orchestration.center.listener.DataChangedEvent;
+import org.apache.shardingsphere.orchestration.core.common.event.ClusterConfigurationChangedEvent;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public final class ClusterConfigurationChangedListenerTest {
+    
+    private static final String DATA_CLUSTER_YAML = "yaml/data-cluster.yaml";
+    
+    private ClusterConfigurationChangedListener clusterConfigurationChangedListener;
+    
+    @Mock
+    private ConfigCenterRepository configCenterRepository;
+    
+    @Before
+    public void setUp() {
+        clusterConfigurationChangedListener = new ClusterConfigurationChangedListener("test", configCenterRepository);
+    }
+    
+    @Test
+    public void assertCreateShardingOrchestrationEvent() {
+        ClusterConfigurationChangedEvent event = clusterConfigurationChangedListener
+                .createShardingOrchestrationEvent(new DataChangedEvent("test", readYAML(DATA_CLUSTER_YAML), DataChangedEvent.ChangedType.UPDATED));
+        assertNotNull(event);
+        assertNotNull(event.getClusterConfiguration());
+        assertNotNull(event.getClusterConfiguration().getHeartbeat());
+        assertThat(event.getClusterConfiguration().getHeartbeat().getSql(), is("select 1"));
+        assertThat(event.getClusterConfiguration().getHeartbeat().getThreadCount(), is(1));
+        assertThat(event.getClusterConfiguration().getHeartbeat().getInterval(), is(60));
+        assertFalse(event.getClusterConfiguration().getHeartbeat().getRetryEnable());
+        assertThat(event.getClusterConfiguration().getHeartbeat().getRetryMaximum(), is(3));
+        assertThat(event.getClusterConfiguration().getHeartbeat().getRetryInterval(), is(3));
+    }
+    
+    @SneakyThrows
+    private String readYAML(final String yamlFile) {
+        return Files.readAllLines(Paths.get(ClassLoader.getSystemResource(yamlFile).toURI())).stream().map(each -> each + System.lineSeparator()).collect(Collectors.joining());
+    }
+}
diff --git a/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/resources/yaml/data-cluster.yaml b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/resources/yaml/data-cluster.yaml
new file mode 100644
index 0000000..7fb1e3f
--- /dev/null
+++ b/shardingsphere-control-panel/shardingsphere-orchestration/shardingsphere-orchestration-core/shardingsphere-orchestration-core-configcenter/src/test/resources/yaml/data-cluster.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+heartbeat:
+  sql: select 1
+  threadCount: 1
+  interval: 60
+  retryEnable: false
+  retryMaximum: 3
+  retryInterval: 3