You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/01/05 11:41:36 UTC
[shardingsphere] branch master updated: Refactor compute node circuit break event (#14548)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 60dde0d Refactor compute node circuit break event (#14548)
60dde0d is described below
commit 60dde0d2560d304e51c61dc9565b352a20ddf2ff
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Wed Jan 5 19:40:50 2022 +0800
Refactor compute node circuit break event (#14548)
---
.../mode/metadata/persist/node/ComputeNode.java | 34 ++++++++++++++++++++++
.../subscriber/ComputeNodeStatusSubscriber.java | 16 +++++++---
.../watcher/ComputeNodeStateChangedWatcher.java | 22 +++++++-------
.../ComputeNodeStateChangedWatcherTest.java | 8 +++--
4 files changed, 64 insertions(+), 16 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 40ef324..1ead3ee 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -19,6 +19,9 @@ package org.apache.shardingsphere.mode.metadata.persist.node;
import org.apache.shardingsphere.infra.instance.InstanceType;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
/**
* Compute node.
*/
@@ -66,4 +69,35 @@ public final class ComputeNode {
public static String getInstanceLabelNodePath(final String instanceId) {
return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE, instanceId, LABEL_NODE);
}
+
+ /**
+ * Get attributes node path.
+ *
+ * @return attributes node path
+ */
+ public static String getAttributesNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE);
+ }
+
+ /**
+ * Get instance id by status path.
+ *
+ * @param statusPath status path
+ * @return instance id
+ */
+ public static String getInstanceIdByStatus(final String statusPath) {
+ Pattern pattern = Pattern.compile(getAttributesNodePath() + "/([\\S]+)/status$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(statusPath);
+ return matcher.find() ? matcher.group(1) : "";
+ }
+
+ /**
+ * Get instance status node path.
+ *
+ * @param instanceId instance id
+ * @return instance status node path
+ */
+ public static String getInstanceStatusNodePath(final String instanceId) {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE, instanceId, STATUS_NODE);
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatus [...]
index 006fad3..e999f33 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -17,14 +17,19 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
+import com.google.common.base.Strings;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.Instance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import java.util.ArrayList;
+import java.util.Collection;
+
/**
* Compute node status subscriber.
*/
@@ -44,11 +49,14 @@ public final class ComputeNodeStatusSubscriber {
*/
@Subscribe
public void update(final ComputeNodeStatusChangedEvent event) {
- String computeNodePath = ComputeStatusNode.getStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER, Instance.getInstance().getInstanceId(event.getIp(), event.getPort()));
+ String computeStatusNodePath = ComputeNode.getInstanceStatusNodePath(Instance.getInstance().getInstanceId(event.getIp(), event.getPort()));
+ String yamlContext = repository.get(computeStatusNodePath);
+ Collection<String> status = Strings.isNullOrEmpty(yamlContext) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContext, Collection.class);
if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAKER) {
- repository.persist(computeNodePath, "");
+ status.add(ComputeNodeStatus.CIRCUIT_BREAKER.name());
} else {
- repository.delete(computeNodePath);
+ status.remove(ComputeNodeStatus.CIRCUIT_BREAKER.name());
}
+ repository.persist(computeStatusNodePath, YamlEngine.marshal(status));
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan [...]
index ed4c954..350feba 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -17,15 +17,17 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
+import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.state.StateEvent;
import org.apache.shardingsphere.infra.state.StateType;
-import org.apache.shardingsphere.infra.instance.Instance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -38,21 +40,21 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<S
@Override
public Collection<String> getWatchingKeys() {
- return Collections.singleton(ComputeStatusNode.getRootPath());
+ return Collections.singleton(ComputeNode.getAttributesNodePath());
}
@Override
public Collection<Type> getWatchingTypes() {
- return Arrays.asList(Type.ADDED, Type.DELETED);
+ return Arrays.asList(Type.ADDED, Type.UPDATED);
}
@Override
public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent event) {
- return isCircuitBreaker(event.getKey()) ? Optional.of(new StateEvent(StateType.CIRCUIT_BREAK, Type.ADDED == event.getType())) : Optional.empty();
- }
-
- private boolean isCircuitBreaker(final String dataChangedPath) {
- return dataChangedPath.startsWith(ComputeStatusNode.getStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER))
- && dataChangedPath.substring(dataChangedPath.lastIndexOf("/") + 1).equals(Instance.getInstance().getId());
+ String instanceId = ComputeNode.getInstanceIdByStatus(event.getKey());
+ if (!Strings.isNullOrEmpty(instanceId)) {
+ Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
+ return Optional.of(new StateEvent(StateType.CIRCUIT_BREAK, status.contains(ComputeNodeStatus.CIRCUIT_BREAKER.name())));
+ }
+ return Optional.empty();
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState [...]
index ee8a54d..b6c00c7 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import org.apache.shardingsphere.infra.state.StateEvent;
import org.apache.shardingsphere.infra.instance.Instance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.After;
@@ -26,6 +28,7 @@ import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
+import java.util.Arrays;
import java.util.Optional;
import static org.junit.Assert.assertFalse;
@@ -52,14 +55,15 @@ public final class ComputeNodeStateChangedWatcherTest {
@Test
public void assertCreateEventWhenEnabled() {
- Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", Type.ADDED));
+ Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status",
+ YamlEngine.marshal(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAKER.name())), Type.ADDED));
assertTrue(actual.isPresent());
assertTrue(actual.get().isOn());
}
@Test
public void assertCreateEventWhenDisabled() {
- Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", Type.DELETED));
+ Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "", Type.UPDATED));
assertTrue(actual.isPresent());
assertFalse(actual.get().isOn());
}