You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/09/17 22:05:24 UTC

[shardingsphere] branch master updated: Recognize circuit breaker status for self compute node only (#12531)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new effcd98  Recognize circuit breaker status for self compute node only (#12531)
effcd98 is described below

commit effcd98db6131d08660a31064a9e0f82c9a4d552
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Sep 18 06:04:30 2021 +0800

    Recognize circuit breaker status for self compute node only (#12531)
---
 .../cluster/coordinator/ClusterInstance.java        |  4 ++--
 .../registry/status/node/StatusNode.java            | 15 ---------------
 .../watcher/ComputeNodeStateChangedWatcher.java     | 10 +++++-----
 .../registry/status/node/StatusNodeTest.java        | 16 ----------------
 .../watcher/ComputeNodeStateChangedWatcherTest.java | 21 +++++++++++++++++++++
 5 files changed, 28 insertions(+), 38 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
index 9cc467b..92b922e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
@@ -35,14 +35,14 @@ public final class ClusterInstance {
     
     private static final ClusterInstance INSTANCE = new ClusterInstance();
     
-    private String id;
+    private volatile String id;
     
     /**
      * Init cluster instance.
      * 
      * @param port port
      */
-    public void init(final Integer port) {
+    public synchronized void init(final Integer port) {
         id = String.join(DELIMITER, IpUtils.getIp(), null == port ? ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] : String.valueOf(port));
     }
     
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
index e80dfe3..65c4fb4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
@@ -17,18 +17,15 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node;
 
-import com.google.common.base.Joiner;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.StorageNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.schema.ClusterSchema;
 
-import java.util.Arrays;
 import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 /**
  * Status node.
@@ -125,16 +122,4 @@ public final class StatusNode {
     public static String getPrivilegeNodePath() {
         return String.join("/", "", ROOT_NODE, PRIVILEGE_NODE);
     }
-    
-    /**
-     * Find compute node.
-     *
-     * @param computeNodeFullPath compute node full path
-     * @return compute node
-     */
-    public static Optional<ComputeNode> findComputeNode(final String computeNodeFullPath) {
-        String status = Joiner.on("|").join(Arrays.stream(ComputeNodeStatus.values()).map(each -> each.name().toLowerCase()).collect(Collectors.toList()));
-        Matcher matcher = Pattern.compile(getComputeNodePath() + "/(" + status + ")/(\\S+)$", Pattern.CASE_INSENSITIVE).matcher(computeNodeFullPath);
-        return matcher.find() ? Optional.of(new ComputeNode(matcher.group(1), matcher.group(2))) : Optional.empty();
-    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
index 2a777c7..93fd71a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.watcher;
 
 import org.apache.shardingsphere.infra.state.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterInstance;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.ComputeNode;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.StatusNode;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -47,12 +47,12 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<S
     
     @Override
     public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent event) {
-        Optional<ComputeNode> computeNode = StatusNode.findComputeNode(event.getKey());
         // TODO use enum to instead of CIRCUIT_BREAK
-        return computeNode.isPresent() && isCircuitBreakerStatus(computeNode.get()) ? Optional.of(new StateEvent("CIRCUIT_BREAK", Type.ADDED == event.getType())) : Optional.empty();
+        return isCircuitBreaker(event.getKey()) ? Optional.of(new StateEvent("CIRCUIT_BREAK", Type.ADDED == event.getType())) : Optional.empty();
     }
     
-    private boolean isCircuitBreakerStatus(final ComputeNode computeNode) {
-        return ComputeNodeStatus.CIRCUIT_BREAKER.name().equalsIgnoreCase(computeNode.getStatus());
+    private boolean isCircuitBreaker(final String dataChangedPath) {
+        return dataChangedPath.startsWith(StatusNode.getComputeNodeStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER))
+                && dataChangedPath.substring(dataChangedPath.lastIndexOf("/") + 1).equals(ClusterInstance.getInstance().getId());
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
index 36ce29f..570c70b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
@@ -72,20 +72,4 @@ public final class StatusNodeTest {
     public void assertGetPrivilegeNodePath() {
         assertThat(StatusNode.getPrivilegeNodePath(), is("/status/privilegenode"));
     }
-    
-    @Test
-    public void assertFindCircuitBreakerComputeNode() {
-        Optional<ComputeNode> actual = StatusNode.findComputeNode("/status/compute_nodes/circuit_breaker/127.0.0.1@3307");
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getStatus(), is("circuit_breaker"));
-        assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
-    }
-    
-    @Test
-    public void assertFindOnlineComputeNode() {
-        Optional<ComputeNode> actual = StatusNode.findComputeNode("/status/compute_nodes/online/127.0.0.1@3307");
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getStatus(), is("online"));
-        assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
-    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
index 743e720..acbb972 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -18,10 +18,14 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.watcher;
 
 import org.apache.shardingsphere.infra.state.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterInstance;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.util.Optional;
 
 import static org.junit.Assert.assertFalse;
@@ -29,6 +33,23 @@ import static org.junit.Assert.assertTrue;
 
 public final class ComputeNodeStateChangedWatcherTest {
     
+    private String originalClusterInstanceId;
+    
+    @Before
+    public void setUp() throws NoSuchFieldException, IllegalAccessException {
+        originalClusterInstanceId = ClusterInstance.getInstance().getId();
+        Field field = ClusterInstance.class.getDeclaredField("id");
+        field.setAccessible(true);
+        field.set(ClusterInstance.getInstance(), "127.0.0.1@3307");
+    }
+    
+    @After
+    public void tearDown() throws NoSuchFieldException, IllegalAccessException {
+        Field field = ClusterInstance.class.getDeclaredField("id");
+        field.setAccessible(true);
+        field.set(ClusterInstance.getInstance(), originalClusterInstanceId);
+    }
+    
     @Test
     public void assertCreateEventWhenEnabled() {
         Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", Type.ADDED));