You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/09/17 22:05:24 UTC
[shardingsphere] branch master updated: Recognize circuit breaker
status for self compute node only (#12531)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new effcd98 Recognize circuit breaker status for self compute node only (#12531)
effcd98 is described below
commit effcd98db6131d08660a31064a9e0f82c9a4d552
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Sep 18 06:04:30 2021 +0800
Recognize circuit breaker status for self compute node only (#12531)
---
.../cluster/coordinator/ClusterInstance.java | 4 ++--
.../registry/status/node/StatusNode.java | 15 ---------------
.../watcher/ComputeNodeStateChangedWatcher.java | 10 +++++-----
.../registry/status/node/StatusNodeTest.java | 16 ----------------
.../watcher/ComputeNodeStateChangedWatcherTest.java | 21 +++++++++++++++++++++
5 files changed, 28 insertions(+), 38 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
index 9cc467b..92b922e 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
@@ -35,14 +35,14 @@ public final class ClusterInstance {
private static final ClusterInstance INSTANCE = new ClusterInstance();
- private String id;
+ private volatile String id;
/**
* Init cluster instance.
*
* @param port port
*/
- public void init(final Integer port) {
+ public synchronized void init(final Integer port) {
id = String.join(DELIMITER, IpUtils.getIp(), null == port ? ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] : String.valueOf(port));
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
index e80dfe3..65c4fb4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
@@ -17,18 +17,15 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node;
-import com.google.common.base.Joiner;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.StorageNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.schema.ClusterSchema;
-import java.util.Arrays;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
/**
* Status node.
@@ -125,16 +122,4 @@ public final class StatusNode {
public static String getPrivilegeNodePath() {
return String.join("/", "", ROOT_NODE, PRIVILEGE_NODE);
}
-
- /**
- * Find compute node.
- *
- * @param computeNodeFullPath compute node full path
- * @return compute node
- */
- public static Optional<ComputeNode> findComputeNode(final String computeNodeFullPath) {
- String status = Joiner.on("|").join(Arrays.stream(ComputeNodeStatus.values()).map(each -> each.name().toLowerCase()).collect(Collectors.toList()));
- Matcher matcher = Pattern.compile(getComputeNodePath() + "/(" + status + ")/(\\S+)$", Pattern.CASE_INSENSITIVE).matcher(computeNodeFullPath);
- return matcher.find() ? Optional.of(new ComputeNode(matcher.group(1), matcher.group(2))) : Optional.empty();
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
index 2a777c7..93fd71a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.watcher;
import org.apache.shardingsphere.infra.state.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterInstance;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.StatusNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -47,12 +47,12 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<S
@Override
public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent event) {
- Optional<ComputeNode> computeNode = StatusNode.findComputeNode(event.getKey());
// TODO use enum to instead of CIRCUIT_BREAK
- return computeNode.isPresent() && isCircuitBreakerStatus(computeNode.get()) ? Optional.of(new StateEvent("CIRCUIT_BREAK", Type.ADDED == event.getType())) : Optional.empty();
+ return isCircuitBreaker(event.getKey()) ? Optional.of(new StateEvent("CIRCUIT_BREAK", Type.ADDED == event.getType())) : Optional.empty();
}
- private boolean isCircuitBreakerStatus(final ComputeNode computeNode) {
- return ComputeNodeStatus.CIRCUIT_BREAKER.name().equalsIgnoreCase(computeNode.getStatus());
+ private boolean isCircuitBreaker(final String dataChangedPath) {
+ return dataChangedPath.startsWith(StatusNode.getComputeNodeStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER))
+ && dataChangedPath.substring(dataChangedPath.lastIndexOf("/") + 1).equals(ClusterInstance.getInstance().getId());
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
index 36ce29f..570c70b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
@@ -72,20 +72,4 @@ public final class StatusNodeTest {
public void assertGetPrivilegeNodePath() {
assertThat(StatusNode.getPrivilegeNodePath(), is("/status/privilegenode"));
}
-
- @Test
- public void assertFindCircuitBreakerComputeNode() {
- Optional<ComputeNode> actual = StatusNode.findComputeNode("/status/compute_nodes/circuit_breaker/127.0.0.1@3307");
- assertTrue(actual.isPresent());
- assertThat(actual.get().getStatus(), is("circuit_breaker"));
- assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
- }
-
- @Test
- public void assertFindOnlineComputeNode() {
- Optional<ComputeNode> actual = StatusNode.findComputeNode("/status/compute_nodes/online/127.0.0.1@3307");
- assertTrue(actual.isPresent());
- assertThat(actual.get().getStatus(), is("online"));
- assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
index 743e720..acbb972 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -18,10 +18,14 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.watcher;
import org.apache.shardingsphere.infra.state.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterInstance;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.lang.reflect.Field;
import java.util.Optional;
import static org.junit.Assert.assertFalse;
@@ -29,6 +33,23 @@ import static org.junit.Assert.assertTrue;
public final class ComputeNodeStateChangedWatcherTest {
+ private String originalClusterInstanceId;
+
+ @Before
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ originalClusterInstanceId = ClusterInstance.getInstance().getId();
+ Field field = ClusterInstance.class.getDeclaredField("id");
+ field.setAccessible(true);
+ field.set(ClusterInstance.getInstance(), "127.0.0.1@3307");
+ }
+
+ @After
+ public void tearDown() throws NoSuchFieldException, IllegalAccessException {
+ Field field = ClusterInstance.class.getDeclaredField("id");
+ field.setAccessible(true);
+ field.set(ClusterInstance.getInstance(), originalClusterInstanceId);
+ }
+
@Test
public void assertCreateEventWhenEnabled() {
Optional<StateEvent> actual = new ComputeNodeStateChangedWatcher().createGovernanceEvent(new DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", Type.ADDED));