You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/01/05 11:41:36 UTC

[shardingsphere] branch master updated: Refactor compute node circuit break event (#14548)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 60dde0d  Refactor compute node circuit break event (#14548)
60dde0d is described below

commit 60dde0d2560d304e51c61dc9565b352a20ddf2ff
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Wed Jan 5 19:40:50 2022 +0800

    Refactor compute node circuit break event (#14548)
---
 .../mode/metadata/persist/node/ComputeNode.java    | 34 ++++++++++++++++++++++
 .../subscriber/ComputeNodeStatusSubscriber.java    | 16 +++++++---
 .../watcher/ComputeNodeStateChangedWatcher.java    | 22 +++++++-------
 .../ComputeNodeStateChangedWatcherTest.java        |  8 +++--
 4 files changed, 64 insertions(+), 16 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 40ef324..1ead3ee 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -19,6 +19,9 @@ package org.apache.shardingsphere.mode.metadata.persist.node;
 
 import org.apache.shardingsphere.infra.instance.InstanceType;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Compute node.
  */
@@ -66,4 +69,35 @@ public final class ComputeNode {
     public static String getInstanceLabelNodePath(final String instanceId) {
         return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE, instanceId, LABEL_NODE);
     }
+    
+    /**
+     * Get attributes node path.
+     * 
+     * @return attributes node path
+     */
+    public static String getAttributesNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE);
+    }
+    
+    /**
+     * Get instance id by status path.
+     * 
+     * @param statusPath status path
+     * @return instance id
+     */
+    public static String getInstanceIdByStatus(final String statusPath) {
+        Pattern pattern = Pattern.compile(getAttributesNodePath() + "/([\\S]+)/status$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(statusPath);
+        return matcher.find() ? matcher.group(1) : "";
+    }
+    
+    /**
+     * Get instance status node path.
+     * 
+     * @param instanceId instance id
+     * @return instance status node path
+     */
+    public static String getInstanceStatusNodePath(final String instanceId) {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE, instanceId, STATUS_NODE);
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatus [...]
index 006fad3..e999f33 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -17,14 +17,19 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
 
+import com.google.common.base.Strings;
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.instance.Instance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
+import java.util.ArrayList;
+import java.util.Collection;
+
 /**
  * Compute node status subscriber.
  */
@@ -44,11 +49,14 @@ public final class ComputeNodeStatusSubscriber {
      */
     @Subscribe
     public void update(final ComputeNodeStatusChangedEvent event) {
-        String computeNodePath = ComputeStatusNode.getStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER, Instance.getInstance().getInstanceId(event.getIp(), event.getPort()));
+        String computeStatusNodePath = ComputeNode.getInstanceStatusNodePath(Instance.getInstance().getInstanceId(event.getIp(), event.getPort()));
+        String yamlContext = repository.get(computeStatusNodePath);
+        Collection<String> status = Strings.isNullOrEmpty(yamlContext) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContext, Collection.class);
         if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAKER) {
-            repository.persist(computeNodePath, "");
+            status.add(ComputeNodeStatus.CIRCUIT_BREAKER.name());
         } else {
-            repository.delete(computeNodePath);
+            status.remove(ComputeNodeStatus.CIRCUIT_BREAKER.name());
         }
+        repository.persist(computeStatusNodePath, YamlEngine.marshal(status));
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan [...]
index ed4c954..350feba 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -17,15 +17,17 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
 
+import com.google.common.base.Strings;
 import org.apache.shardingsphere.infra.state.StateEvent;
 import org.apache.shardingsphere.infra.state.StateType;
-import org.apache.shardingsphere.infra.instance.Instance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,21 +40,21 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<S
     
     @Override
     public Collection<String> getWatchingKeys() {
-        return Collections.singleton(ComputeStatusNode.getRootPath());
+        return Collections.singleton(ComputeNode.getAttributesNodePath());
     }
     
     @Override
     public Collection<Type> getWatchingTypes() {
-        return Arrays.asList(Type.ADDED, Type.DELETED);
+        return Arrays.asList(Type.ADDED, Type.UPDATED);
     }
     
     @Override
     public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent event) {
-        return isCircuitBreaker(event.getKey()) ? Optional.of(new StateEvent(StateType.CIRCUIT_BREAK, Type.ADDED == event.getType())) : Optional.empty();
-    }
-    
-    private boolean isCircuitBreaker(final String dataChangedPath) {
-        return dataChangedPath.startsWith(ComputeStatusNode.getStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER))
-                && dataChangedPath.substring(dataChangedPath.lastIndexOf("/") + 1).equals(Instance.getInstance().getId());
+        String instanceId = ComputeNode.getInstanceIdByStatus(event.getKey());
+        if (!Strings.isNullOrEmpty(instanceId)) {
+            Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
+            return Optional.of(new StateEvent(StateType.CIRCUIT_BREAK, status.contains(ComputeNodeStatus.CIRCUIT_BREAKER.name())));
+        }
+        return Optional.empty();
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState [...]
index ee8a54d..b6c00c7 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 
 import org.apache.shardingsphere.infra.state.StateEvent;
 import org.apache.shardingsphere.infra.instance.Instance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.After;
@@ -26,6 +28,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.Optional;
 
 import static org.junit.Assert.assertFalse;
@@ -52,14 +55,15 @@ public final class ComputeNodeStateChangedWatcherTest {
     
     @Test
     public void assertCreateEventWhenEnabled() {
-        Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", Type.ADDED));
+        Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", 
+                YamlEngine.marshal(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAKER.name())), Type.ADDED));
         assertTrue(actual.isPresent());
         assertTrue(actual.get().isOn());
     }
     
     @Test
     public void assertCreateEventWhenDisabled() {
-        Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", Type.DELETED));
+        Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "", Type.UPDATED));
         assertTrue(actual.isPresent());
         assertFalse(actual.get().isOn());
     }