You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/11/21 02:19:03 UTC

[shardingsphere] branch master updated: Refactor enable and disable instance state (#22291)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0160f941322 Refactor enable and disable instance state (#22291)
0160f941322 is described below

commit 0160f941322ddede730f3acc6b8cea8cae153940
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Mon Nov 21 10:18:54 2022 +0800

    Refactor enable and disable instance state (#22291)
---
 .../infra/instance/ComputeNodeInstance.java        |  4 +-
 .../infra/instance/InstanceContext.java            |  4 +-
 .../infra/instance/InstanceContextTest.java        |  5 +-
 .../cluster/coordinator/RegistryCenter.java        |  1 +
 .../registry/status/compute/ComputeNodeStatus.java | 26 ---------
 .../event/ComputeNodeStatusChangedEvent.java       |  4 +-
 .../registry/status/compute/event/StateEvent.java  |  4 +-
 .../compute/service/ComputeNodeStatusService.java  | 16 +++++-
 .../subscriber/ComputeNodeStatusSubscriber.java    | 15 +----
 .../watcher/ComputeNodeStateChangedWatcher.java    | 65 +++++++++++-----------
 .../service/ComputeNodeStatusServiceTest.java      | 11 ++++
 .../ComputeNodeStateChangedWatcherTest.java        | 25 ++++-----
 .../subscriber/StateChangedSubscriberTest.java     |  7 +--
 .../listener/SessionConnectionListener.java        | 14 ++++-
 .../ral/updatable/SetInstanceStatusHandler.java    |  3 +-
 15 files changed, 92 insertions(+), 112 deletions(-)

diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 54d594d7fe5..70f8647766d 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -62,8 +62,8 @@ public final class ComputeNodeInstance {
      *
      * @param status status
      */
-    public void switchState(final Collection<String> status) {
-        state.switchState(StateType.CIRCUIT_BREAK, null != status && status.contains(StateType.CIRCUIT_BREAK.name()));
+    public void switchState(final String status) {
+        state.switchState(StateType.CIRCUIT_BREAK, StateType.CIRCUIT_BREAK.name().equals(status));
     }
     
     /**
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 6a911b8227b..beab858732d 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -60,14 +60,14 @@ public final class InstanceContext {
      * @param instanceId instance id
      * @param status status
      */
-    public void updateInstanceStatus(final String instanceId, final Collection<String> status) {
+    public void updateInstanceStatus(final String instanceId, final String status) {
         if (instance.getMetaData().getId().equals(instanceId)) {
             instance.switchState(status);
         }
         updateRelatedComputeNodeInstancesStatus(instanceId, status);
     }
     
-    private void updateRelatedComputeNodeInstancesStatus(final String instanceId, final Collection<String> status) {
+    private void updateRelatedComputeNodeInstancesStatus(final String instanceId, final String status) {
         for (ComputeNodeInstance each : allClusterInstances) {
             if (each.getMetaData().getId().equals(instanceId)) {
                 each.switchState(status);
diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
index fba18d1c02d..83bf38a9a00 100644
--- a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
+++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.Properties;
 import java.util.Set;
@@ -55,10 +54,10 @@ public final class InstanceContextTest {
         InstanceContext context = new InstanceContext(new ComputeNodeInstance(instanceMetaData), new WorkerIdGeneratorFixture(Integer.MIN_VALUE), modeConfig, lockContext, eventBusContext);
         StateType actual = context.getInstance().getState().getCurrentState();
         assertThat(actual, is(StateType.OK));
-        context.updateInstanceStatus(instanceMetaData.getId(), Collections.singleton(StateType.CIRCUIT_BREAK.name()));
+        context.updateInstanceStatus(instanceMetaData.getId(), StateType.CIRCUIT_BREAK.name());
         actual = context.getInstance().getState().getCurrentState();
         assertThat(actual, is(StateType.CIRCUIT_BREAK));
-        context.updateInstanceStatus(instanceMetaData.getId(), Collections.emptyList());
+        context.updateInstanceStatus(instanceMetaData.getId(), StateType.OK.name());
         actual = context.getInstance().getState().getCurrentState();
         assertThat(actual, is(StateType.OK));
     }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index ec4e6f57b1b..081ee83bafa 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -95,6 +95,7 @@ public final class RegistryCenter {
     public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
         computeNodeStatusService.registerOnline(computeNodeInstance.getMetaData());
         computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getLabels());
+        computeNodeStatusService.persistInstanceState(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getState());
         listenerFactory.watchListeners();
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/ComputeNodeStatus.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/ComputeNodeStatus.java
deleted file mode 100644
index 8ecb537ba89..00000000000
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/ComputeNodeStatus.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute;
-
-/**
- * Compute node status.
- */
-public enum ComputeNodeStatus {
-    
-    ONLINE, CIRCUIT_BREAK
-}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
index 8f443669bda..fd7d5191ebe 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
+import org.apache.shardingsphere.infra.state.StateType;
 
 /**
  * Compute node status changed event.
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 @Getter
 public final class ComputeNodeStatusChangedEvent {
     
-    private final ComputeNodeStatus status;
+    private final StateType state;
     
     private final String instanceId;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
index 79e81a3320b..54dcfa49401 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
@@ -21,8 +21,6 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
-import java.util.Collection;
-
 /**
  * State event.
  */
@@ -32,5 +30,5 @@ public final class StateEvent implements GovernanceEvent {
     
     private final String instanceId;
     
-    private final Collection<String> status;
+    private final String status;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 8265af790e8..35d8dbf57f4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataBuilderFactory;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.state.StateContext;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -65,6 +66,16 @@ public final class ComputeNodeStatusService {
         }
     }
     
+    /**
+     * Persist instance state.
+     *
+     * @param instanceId instance id
+     * @param state state context
+     */
+    public void persistInstanceState(final String instanceId, final StateContext state) {
+        repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId), state.getCurrentState().name());
+    }
+    
     /**
      * Persist instance worker id.
      *
@@ -94,9 +105,8 @@ public final class ComputeNodeStatusService {
      * @return status
      */
     @SuppressWarnings("unchecked")
-    public Collection<String> loadInstanceStatus(final String instanceId) {
-        String yamlContent = repository.getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
-        return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
+    public String loadInstanceStatus(final String instanceId) {
+        return repository.getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
     }
     
     /**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index 1799b1b6b8e..7f36f2b7acb 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -17,18 +17,13 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
 
-import com.google.common.base.Strings;
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 
 /**
@@ -54,15 +49,7 @@ public final class ComputeNodeStatusSubscriber {
      */
     @Subscribe
     public void update(final ComputeNodeStatusChangedEvent event) {
-        String computeStatusNodePath = ComputeNode.getInstanceStatusNodePath(event.getInstanceId());
-        String yamlContext = repository.getDirectly(computeStatusNodePath);
-        Collection<String> status = Strings.isNullOrEmpty(yamlContext) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContext, Collection.class);
-        if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAK) {
-            status.add(ComputeNodeStatus.CIRCUIT_BREAK.name());
-        } else {
-            status.remove(ComputeNodeStatus.CIRCUIT_BREAK.name());
-        }
-        repository.persistEphemeral(computeStatusNodePath, YamlEngine.marshal(status));
+        repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(event.getInstanceId()), event.getState().name());
     }
     
     /**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index beb7170fa47..9bff63168e1 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -65,16 +65,15 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
     public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
         String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey());
         if (!Strings.isNullOrEmpty(instanceId)) {
-            if (event.getKey().equals(ComputeNode.getInstanceStatusNodePath(instanceId))) {
-                Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
-                return Optional.of(new StateEvent(instanceId, status));
+            if (event.getKey().equals(ComputeNode.getInstanceStatusNodePath(instanceId)) && Type.DELETED != event.getType()) {
+                return Optional.of(new StateEvent(instanceId, event.getValue()));
+            }
+            if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId)) && Type.DELETED != event.getType()) {
+                return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
             }
             if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
                 return Optional.of(new WorkerIdEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Integer.valueOf(event.getValue())));
             }
-            if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
-                return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
-            }
         }
         if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
             return createInstanceEvent(event);
@@ -88,20 +87,24 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
         return Optional.empty();
     }
     
-    private Optional<GovernanceEvent> createKillProcessListIdEvent(final DataChangedEvent event) {
-        Matcher matcher = getKillProcessListIdMatcher(event);
-        if (!matcher.find()) {
-            return Optional.empty();
-        }
-        if (Type.ADDED == event.getType()) {
-            return Optional.of(new KillProcessListIdEvent(matcher.group(1), matcher.group(2)));
-        }
-        if (Type.DELETED == event.getType()) {
-            return Optional.of(new KillProcessListIdUnitCompleteEvent(matcher.group(2)));
+    private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
+        Matcher matcher = matchInstanceOnlinePath(event.getKey());
+        if (matcher.find()) {
+            InstanceMetaData instanceMetaData = InstanceMetaDataBuilderFactory.create(matcher.group(2), InstanceType.valueOf(matcher.group(1).toUpperCase()), event.getValue());
+            if (Type.ADDED == event.getType()) {
+                return Optional.of(new InstanceOnlineEvent(instanceMetaData));
+            }
+            if (Type.DELETED == event.getType()) {
+                return Optional.of(new InstanceOfflineEvent(instanceMetaData));
+            }
         }
         return Optional.empty();
     }
     
+    private Matcher matchInstanceOnlinePath(final String onlineInstancePath) {
+        return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath);
+    }
+    
     private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
         Matcher matcher = getShowProcessTriggerMatcher(event);
         if (!matcher.find()) {
@@ -120,26 +123,22 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
         return Pattern.compile(ComputeNode.getProcessTriggerNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey());
     }
     
-    private static Matcher getKillProcessListIdMatcher(final DataChangedEvent event) {
-        Pattern pattern = Pattern.compile(ComputeNode.getProcessKillNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
-        return pattern.matcher(event.getKey());
-    }
-    
-    private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
-        Matcher matcher = matchInstanceOnlinePath(event.getKey());
-        if (matcher.find()) {
-            InstanceMetaData instanceMetaData = InstanceMetaDataBuilderFactory.create(matcher.group(2), InstanceType.valueOf(matcher.group(1).toUpperCase()), event.getValue());
-            if (Type.ADDED == event.getType()) {
-                return Optional.of(new InstanceOnlineEvent(instanceMetaData));
-            }
-            if (Type.DELETED == event.getType()) {
-                return Optional.of(new InstanceOfflineEvent(instanceMetaData));
-            }
+    private Optional<GovernanceEvent> createKillProcessListIdEvent(final DataChangedEvent event) {
+        Matcher matcher = getKillProcessListIdMatcher(event);
+        if (!matcher.find()) {
+            return Optional.empty();
+        }
+        if (Type.ADDED == event.getType()) {
+            return Optional.of(new KillProcessListIdEvent(matcher.group(1), matcher.group(2)));
+        }
+        if (Type.DELETED == event.getType()) {
+            return Optional.of(new KillProcessListIdUnitCompleteEvent(matcher.group(2)));
         }
         return Optional.empty();
     }
     
-    private Matcher matchInstanceOnlinePath(final String onlineInstancePath) {
-        return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath);
+    private static Matcher getKillProcessListIdMatcher(final DataChangedEvent event) {
+        Pattern pattern = Pattern.compile(ComputeNode.getProcessKillNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
+        return pattern.matcher(event.getKey());
     }
 }
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index d165d705305..8fb3a109b9d 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
 import org.apache.shardingsphere.infra.instance.utils.IpUtils;
+import org.apache.shardingsphere.infra.state.StateContext;
+import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -66,6 +68,15 @@ public final class ComputeNodeStatusServiceTest {
         verify(repository, times(1)).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.emptyList()));
     }
     
+    @Test
+    public void assertPersistInstanceState() {
+        ComputeNodeStatusService computeNodeStatusService = new ComputeNodeStatusService(repository);
+        InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+        final String instanceId = instanceMetaData.getId();
+        computeNodeStatusService.persistInstanceState(instanceId, new StateContext());
+        verify(repository, times(1)).persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId), StateType.OK.name());
+    }
+    
     @Test
     public void assertPersistInstanceWorkerId() {
         InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
index a615bf925ae..48ad78ab48f 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
 
+import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -38,39 +37,39 @@ public final class ComputeNodeStateChangedWatcherTest {
     
     @Test
     public void assertCreateEventWhenDisabled() {
-        Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/127.0.0.1@3307",
-                YamlEngine.marshal(Collections.singleton(ComputeNodeStatus.CIRCUIT_BREAK.name())), Type.ADDED));
+        Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
+                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/foo_instance_id", StateType.CIRCUIT_BREAK.name(), Type.ADDED));
         assertTrue(actual.isPresent());
-        assertThat(((StateEvent) actual.get()).getStatus(), is(Collections.singleton(ComputeNodeStatus.CIRCUIT_BREAK.name())));
-        assertThat(((StateEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+        assertThat(((StateEvent) actual.get()).getStatus(), is(StateType.CIRCUIT_BREAK.name()));
+        assertThat(((StateEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
     }
     
     @Test
-    public void assertCreateEventWhenDEnabled() {
+    public void assertCreateEventWhenEnabled() {
         Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
-                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/127.0.0.1@3307", "", Type.UPDATED));
+                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/foo_instance_id", "", Type.UPDATED));
         assertTrue(actual.isPresent());
         assertTrue(((StateEvent) actual.get()).getStatus().isEmpty());
-        assertThat(((StateEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+        assertThat(((StateEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
     }
     
     @Test
     public void assertCreateAddLabelEvent() {
         Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
-                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/127.0.0.1@3307",
+                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/foo_instance_id",
                         YamlEngine.marshal(Arrays.asList("label_1", "label_2")), Type.ADDED));
         assertTrue(actual.isPresent());
         assertThat(((LabelsEvent) actual.get()).getLabels(), is(Arrays.asList("label_1", "label_2")));
-        assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+        assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
     }
     
     @Test
     public void assertCreateUpdateLabelsEvent() {
         Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
-                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/127.0.0.1@3307",
+                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/foo_instance_id",
                         YamlEngine.marshal(Arrays.asList("label_1", "label_2")), Type.UPDATED));
         assertTrue(actual.isPresent());
         assertThat(((LabelsEvent) actual.get()).getLabels(), is(Arrays.asList("label_1", "label_2")));
-        assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+        assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
     }
 }
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 347a5c04893..dbe8752829b 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -143,14 +143,9 @@ public final class StateChangedSubscriberTest {
     
     @Test
     public void assertRenewInstanceStatus() {
-        Collection<String> testStates = new LinkedList<>();
-        testStates.add(StateType.OK.name());
-        StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), testStates);
+        StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), StateType.OK.name());
         subscriber.renew(mockStateEvent);
         assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.OK));
-        testStates.add(StateType.CIRCUIT_BREAK.name());
-        subscriber.renew(mockStateEvent);
-        assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
     }
     
     @Test
diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
index 217147cfe63..18ac2e11717 100644
--- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
+++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
@@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
@@ -51,7 +53,7 @@ public final class SessionConnectionListener implements ConnectionStateListener
             do {
                 reRegistered = reRegister(client);
             } while (!reRegistered);
-            log.debug("instance re-register success instance id: {}", instanceContext.getInstance().getCurrentInstanceId());
+            log.debug("Instance re-register success instance id: {}", instanceContext.getInstance().getCurrentInstanceId());
         }
     }
     
@@ -61,8 +63,7 @@ public final class SessionConnectionListener implements ConnectionStateListener
                 if (isNeedGenerateWorkerId()) {
                     instanceContext.generateWorkerId(new Properties());
                 }
-                repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceContext.getInstance().getCurrentInstanceId(),
-                        instanceContext.getInstance().getMetaData().getType()), instanceContext.getInstance().getMetaData().getAttributes());
+                reRegisterInstanceComputeNode();
                 return true;
             }
             sleepInterval();
@@ -77,6 +78,13 @@ public final class SessionConnectionListener implements ConnectionStateListener
         return -1 != instanceContext.getInstance().getWorkerId();
     }
     
+    private void reRegisterInstanceComputeNode() {
+        ComputeNodeInstance instance = instanceContext.getInstance();
+        repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instance.getCurrentInstanceId(), instance.getMetaData().getType()), instance.getMetaData().getAttributes());
+        repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instance.getCurrentInstanceId()), YamlEngine.marshal(instance.getLabels()));
+        repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instance.getCurrentInstanceId()), instance.getState().getCurrentState().name());
+    }
+    
     @SneakyThrows(InterruptedException.class)
     private void sleepInterval() {
         TimeUnit.SECONDS.sleep(RECONNECT_INTERVAL_SECONDS);
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
index 6b9d0fcf1e9..3ab33a7c853 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
 import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.UpdatableRALBackendHandler;
 
@@ -41,7 +40,7 @@ public final class SetInstanceStatusHandler extends UpdatableRALBackendHandler<S
         } else {
             checkEnablingIsValid(contextManager, instanceId);
         }
-        contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(isDisable ? ComputeNodeStatus.CIRCUIT_BREAK : ComputeNodeStatus.ONLINE, instanceId));
+        contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(isDisable ? StateType.CIRCUIT_BREAK : StateType.OK, instanceId));
     }
     
     private void checkEnablingIsValid(final ContextManager contextManager, final String instanceId) {