You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/11/21 02:19:03 UTC
[shardingsphere] branch master updated: Refactor enable and disable instance state (#22291)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0160f941322 Refactor enable and disable instance state (#22291)
0160f941322 is described below
commit 0160f941322ddede730f3acc6b8cea8cae153940
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Mon Nov 21 10:18:54 2022 +0800
Refactor enable and disable instance state (#22291)
---
.../infra/instance/ComputeNodeInstance.java | 4 +-
.../infra/instance/InstanceContext.java | 4 +-
.../infra/instance/InstanceContextTest.java | 5 +-
.../cluster/coordinator/RegistryCenter.java | 1 +
.../registry/status/compute/ComputeNodeStatus.java | 26 ---------
.../event/ComputeNodeStatusChangedEvent.java | 4 +-
.../registry/status/compute/event/StateEvent.java | 4 +-
.../compute/service/ComputeNodeStatusService.java | 16 +++++-
.../subscriber/ComputeNodeStatusSubscriber.java | 15 +----
.../watcher/ComputeNodeStateChangedWatcher.java | 65 +++++++++++-----------
.../service/ComputeNodeStatusServiceTest.java | 11 ++++
.../ComputeNodeStateChangedWatcherTest.java | 25 ++++-----
.../subscriber/StateChangedSubscriberTest.java | 7 +--
.../listener/SessionConnectionListener.java | 14 ++++-
.../ral/updatable/SetInstanceStatusHandler.java | 3 +-
15 files changed, 92 insertions(+), 112 deletions(-)
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 54d594d7fe5..70f8647766d 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -62,8 +62,8 @@ public final class ComputeNodeInstance {
*
* @param status status
*/
- public void switchState(final Collection<String> status) {
- state.switchState(StateType.CIRCUIT_BREAK, null != status && status.contains(StateType.CIRCUIT_BREAK.name()));
+ public void switchState(final String status) {
+ state.switchState(StateType.CIRCUIT_BREAK, StateType.CIRCUIT_BREAK.name().equals(status));
}
/**
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 6a911b8227b..beab858732d 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -60,14 +60,14 @@ public final class InstanceContext {
* @param instanceId instance id
* @param status status
*/
- public void updateInstanceStatus(final String instanceId, final Collection<String> status) {
+ public void updateInstanceStatus(final String instanceId, final String status) {
if (instance.getMetaData().getId().equals(instanceId)) {
instance.switchState(status);
}
updateRelatedComputeNodeInstancesStatus(instanceId, status);
}
- private void updateRelatedComputeNodeInstancesStatus(final String instanceId, final Collection<String> status) {
+ private void updateRelatedComputeNodeInstancesStatus(final String instanceId, final String status) {
for (ComputeNodeInstance each : allClusterInstances) {
if (each.getMetaData().getId().equals(instanceId)) {
each.switchState(status);
diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
index fba18d1c02d..83bf38a9a00 100644
--- a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
+++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
@@ -55,10 +54,10 @@ public final class InstanceContextTest {
InstanceContext context = new InstanceContext(new ComputeNodeInstance(instanceMetaData), new WorkerIdGeneratorFixture(Integer.MIN_VALUE), modeConfig, lockContext, eventBusContext);
StateType actual = context.getInstance().getState().getCurrentState();
assertThat(actual, is(StateType.OK));
- context.updateInstanceStatus(instanceMetaData.getId(), Collections.singleton(StateType.CIRCUIT_BREAK.name()));
+ context.updateInstanceStatus(instanceMetaData.getId(), StateType.CIRCUIT_BREAK.name());
actual = context.getInstance().getState().getCurrentState();
assertThat(actual, is(StateType.CIRCUIT_BREAK));
- context.updateInstanceStatus(instanceMetaData.getId(), Collections.emptyList());
+ context.updateInstanceStatus(instanceMetaData.getId(), StateType.OK.name());
actual = context.getInstance().getState().getCurrentState();
assertThat(actual, is(StateType.OK));
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index ec4e6f57b1b..081ee83bafa 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -95,6 +95,7 @@ public final class RegistryCenter {
public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
computeNodeStatusService.registerOnline(computeNodeInstance.getMetaData());
computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getLabels());
+ computeNodeStatusService.persistInstanceState(computeNodeInstance.getCurrentInstanceId(), computeNodeInstance.getState());
listenerFactory.watchListeners();
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/ComputeNodeStatus.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/ComputeNodeStatus.java
deleted file mode 100644
index 8ecb537ba89..00000000000
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/ComputeNodeStatus.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute;
-
-/**
- * Compute node status.
- */
-public enum ComputeNodeStatus {
-
- ONLINE, CIRCUIT_BREAK
-}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
index 8f443669bda..fd7d5191ebe 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
+import org.apache.shardingsphere.infra.state.StateType;
/**
* Compute node status changed event.
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
@Getter
public final class ComputeNodeStatusChangedEvent {
- private final ComputeNodeStatus status;
+ private final StateType state;
private final String instanceId;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
index 79e81a3320b..54dcfa49401 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
@@ -21,8 +21,6 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import java.util.Collection;
-
/**
* State event.
*/
@@ -32,5 +30,5 @@ public final class StateEvent implements GovernanceEvent {
private final String instanceId;
- private final Collection<String> status;
+ private final String status;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 8265af790e8..35d8dbf57f4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataBuilderFactory;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.state.StateContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -65,6 +66,16 @@ public final class ComputeNodeStatusService {
}
}
+ /**
+ * Persist instance state.
+ *
+ * @param instanceId instance id
+ * @param state state context
+ */
+ public void persistInstanceState(final String instanceId, final StateContext state) {
+ repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId), state.getCurrentState().name());
+ }
+
/**
* Persist instance worker id.
*
@@ -94,9 +105,8 @@ public final class ComputeNodeStatusService {
* @return status
*/
@SuppressWarnings("unchecked")
- public Collection<String> loadInstanceStatus(final String instanceId) {
- String yamlContent = repository.getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
- return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
+ public String loadInstanceStatus(final String instanceId) {
+ return repository.getDirectly(ComputeNode.getInstanceStatusNodePath(instanceId));
}
/**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index 1799b1b6b8e..7f36f2b7acb 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -17,18 +17,13 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
-import com.google.common.base.Strings;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
/**
@@ -54,15 +49,7 @@ public final class ComputeNodeStatusSubscriber {
*/
@Subscribe
public void update(final ComputeNodeStatusChangedEvent event) {
- String computeStatusNodePath = ComputeNode.getInstanceStatusNodePath(event.getInstanceId());
- String yamlContext = repository.getDirectly(computeStatusNodePath);
- Collection<String> status = Strings.isNullOrEmpty(yamlContext) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContext, Collection.class);
- if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAK) {
- status.add(ComputeNodeStatus.CIRCUIT_BREAK.name());
- } else {
- status.remove(ComputeNodeStatus.CIRCUIT_BREAK.name());
- }
- repository.persistEphemeral(computeStatusNodePath, YamlEngine.marshal(status));
+ repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(event.getInstanceId()), event.getState().name());
}
/**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index beb7170fa47..9bff63168e1 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -65,16 +65,15 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey());
if (!Strings.isNullOrEmpty(instanceId)) {
- if (event.getKey().equals(ComputeNode.getInstanceStatusNodePath(instanceId))) {
- Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
- return Optional.of(new StateEvent(instanceId, status));
+ if (event.getKey().equals(ComputeNode.getInstanceStatusNodePath(instanceId)) && Type.DELETED != event.getType()) {
+ return Optional.of(new StateEvent(instanceId, event.getValue()));
+ }
+ if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId)) && Type.DELETED != event.getType()) {
+ return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
}
if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
return Optional.of(new WorkerIdEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Integer.valueOf(event.getValue())));
}
- if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
- return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
- }
}
if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
return createInstanceEvent(event);
@@ -88,20 +87,24 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
return Optional.empty();
}
- private Optional<GovernanceEvent> createKillProcessListIdEvent(final DataChangedEvent event) {
- Matcher matcher = getKillProcessListIdMatcher(event);
- if (!matcher.find()) {
- return Optional.empty();
- }
- if (Type.ADDED == event.getType()) {
- return Optional.of(new KillProcessListIdEvent(matcher.group(1), matcher.group(2)));
- }
- if (Type.DELETED == event.getType()) {
- return Optional.of(new KillProcessListIdUnitCompleteEvent(matcher.group(2)));
+ private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
+ Matcher matcher = matchInstanceOnlinePath(event.getKey());
+ if (matcher.find()) {
+ InstanceMetaData instanceMetaData = InstanceMetaDataBuilderFactory.create(matcher.group(2), InstanceType.valueOf(matcher.group(1).toUpperCase()), event.getValue());
+ if (Type.ADDED == event.getType()) {
+ return Optional.of(new InstanceOnlineEvent(instanceMetaData));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new InstanceOfflineEvent(instanceMetaData));
+ }
}
return Optional.empty();
}
+ private Matcher matchInstanceOnlinePath(final String onlineInstancePath) {
+ return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath);
+ }
+
private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
Matcher matcher = getShowProcessTriggerMatcher(event);
if (!matcher.find()) {
@@ -120,26 +123,22 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
return Pattern.compile(ComputeNode.getProcessTriggerNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey());
}
- private static Matcher getKillProcessListIdMatcher(final DataChangedEvent event) {
- Pattern pattern = Pattern.compile(ComputeNode.getProcessKillNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
- return pattern.matcher(event.getKey());
- }
-
- private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
- Matcher matcher = matchInstanceOnlinePath(event.getKey());
- if (matcher.find()) {
- InstanceMetaData instanceMetaData = InstanceMetaDataBuilderFactory.create(matcher.group(2), InstanceType.valueOf(matcher.group(1).toUpperCase()), event.getValue());
- if (Type.ADDED == event.getType()) {
- return Optional.of(new InstanceOnlineEvent(instanceMetaData));
- }
- if (Type.DELETED == event.getType()) {
- return Optional.of(new InstanceOfflineEvent(instanceMetaData));
- }
+ private Optional<GovernanceEvent> createKillProcessListIdEvent(final DataChangedEvent event) {
+ Matcher matcher = getKillProcessListIdMatcher(event);
+ if (!matcher.find()) {
+ return Optional.empty();
+ }
+ if (Type.ADDED == event.getType()) {
+ return Optional.of(new KillProcessListIdEvent(matcher.group(1), matcher.group(2)));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new KillProcessListIdUnitCompleteEvent(matcher.group(2)));
}
return Optional.empty();
}
- private Matcher matchInstanceOnlinePath(final String onlineInstancePath) {
- return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath);
+ private static Matcher getKillProcessListIdMatcher(final DataChangedEvent event) {
+ Pattern pattern = Pattern.compile(ComputeNode.getProcessKillNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
+ return pattern.matcher(event.getKey());
}
}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index d165d705305..8fb3a109b9d 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.instance.utils.IpUtils;
+import org.apache.shardingsphere.infra.state.StateContext;
+import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -66,6 +68,15 @@ public final class ComputeNodeStatusServiceTest {
verify(repository, times(1)).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.emptyList()));
}
+ @Test
+ public void assertPersistInstanceState() {
+ ComputeNodeStatusService computeNodeStatusService = new ComputeNodeStatusService(repository);
+ InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
+ final String instanceId = instanceMetaData.getId();
+ computeNodeStatusService.persistInstanceState(instanceId, new StateContext());
+ verify(repository, times(1)).persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId), StateType.OK.name());
+ }
+
@Test
public void assertPersistInstanceWorkerId() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
index a615bf925ae..48ad78ab48f 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
+import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import org.junit.Test;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
@@ -38,39 +37,39 @@ public final class ComputeNodeStateChangedWatcherTest {
@Test
public void assertCreateEventWhenDisabled() {
- Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/127.0.0.1@3307",
- YamlEngine.marshal(Collections.singleton(ComputeNodeStatus.CIRCUIT_BREAK.name())), Type.ADDED));
+ Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
+ .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/foo_instance_id", StateType.CIRCUIT_BREAK.name(), Type.ADDED));
assertTrue(actual.isPresent());
- assertThat(((StateEvent) actual.get()).getStatus(), is(Collections.singleton(ComputeNodeStatus.CIRCUIT_BREAK.name())));
- assertThat(((StateEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+ assertThat(((StateEvent) actual.get()).getStatus(), is(StateType.CIRCUIT_BREAK.name()));
+ assertThat(((StateEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
}
@Test
- public void assertCreateEventWhenDEnabled() {
+ public void assertCreateEventWhenEnabled() {
Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/127.0.0.1@3307", "", Type.UPDATED));
+ .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/status/foo_instance_id", "", Type.UPDATED));
assertTrue(actual.isPresent());
assertTrue(((StateEvent) actual.get()).getStatus().isEmpty());
- assertThat(((StateEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+ assertThat(((StateEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
}
@Test
public void assertCreateAddLabelEvent() {
Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/127.0.0.1@3307",
+ .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/foo_instance_id",
YamlEngine.marshal(Arrays.asList("label_1", "label_2")), Type.ADDED));
assertTrue(actual.isPresent());
assertThat(((LabelsEvent) actual.get()).getLabels(), is(Arrays.asList("label_1", "label_2")));
- assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+ assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
}
@Test
public void assertCreateUpdateLabelsEvent() {
Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/127.0.0.1@3307",
+ .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/labels/foo_instance_id",
YamlEngine.marshal(Arrays.asList("label_1", "label_2")), Type.UPDATED));
assertTrue(actual.isPresent());
assertThat(((LabelsEvent) actual.get()).getLabels(), is(Arrays.asList("label_1", "label_2")));
- assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
+ assertThat(((LabelsEvent) actual.get()).getInstanceId(), is("foo_instance_id"));
}
}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 347a5c04893..dbe8752829b 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -143,14 +143,9 @@ public final class StateChangedSubscriberTest {
@Test
public void assertRenewInstanceStatus() {
- Collection<String> testStates = new LinkedList<>();
- testStates.add(StateType.OK.name());
- StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), testStates);
+ StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), StateType.OK.name());
subscriber.renew(mockStateEvent);
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.OK));
- testStates.add(StateType.CIRCUIT_BREAK.name());
- subscriber.renew(mockStateEvent);
- assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
}
@Test
diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
index 217147cfe63..18ac2e11717 100644
--- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
+++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
@@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
@@ -51,7 +53,7 @@ public final class SessionConnectionListener implements ConnectionStateListener
do {
reRegistered = reRegister(client);
} while (!reRegistered);
- log.debug("instance re-register success instance id: {}", instanceContext.getInstance().getCurrentInstanceId());
+ log.debug("Instance re-register success instance id: {}", instanceContext.getInstance().getCurrentInstanceId());
}
}
@@ -61,8 +63,7 @@ public final class SessionConnectionListener implements ConnectionStateListener
if (isNeedGenerateWorkerId()) {
instanceContext.generateWorkerId(new Properties());
}
- repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceContext.getInstance().getCurrentInstanceId(),
- instanceContext.getInstance().getMetaData().getType()), instanceContext.getInstance().getMetaData().getAttributes());
+ reRegisterInstanceComputeNode();
return true;
}
sleepInterval();
@@ -77,6 +78,13 @@ public final class SessionConnectionListener implements ConnectionStateListener
return -1 != instanceContext.getInstance().getWorkerId();
}
+ private void reRegisterInstanceComputeNode() {
+ ComputeNodeInstance instance = instanceContext.getInstance();
+ repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instance.getCurrentInstanceId(), instance.getMetaData().getType()), instance.getMetaData().getAttributes());
+ repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instance.getCurrentInstanceId()), YamlEngine.marshal(instance.getLabels()));
+ repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instance.getCurrentInstanceId()), instance.getState().getCurrentState().name());
+ }
+
@SneakyThrows(InterruptedException.class)
private void sleepInterval() {
TimeUnit.SECONDS.sleep(RECONNECT_INTERVAL_SECONDS);
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
index 6b9d0fcf1e9..3ab33a7c853 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.UpdatableRALBackendHandler;
@@ -41,7 +40,7 @@ public final class SetInstanceStatusHandler extends UpdatableRALBackendHandler<S
} else {
checkEnablingIsValid(contextManager, instanceId);
}
- contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(isDisable ? ComputeNodeStatus.CIRCUIT_BREAK : ComputeNodeStatus.ONLINE, instanceId));
+ contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(isDisable ? StateType.CIRCUIT_BREAK : StateType.OK, instanceId));
}
private void checkEnablingIsValid(final ContextManager contextManager, final String instanceId) {