You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/30 05:54:07 UTC
[shardingsphere] branch master updated: Complete circuit break
instance by DistSQL (#12841)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f081cc3 Complete circuit break instance by DistSQL (#12841)
f081cc3 is described below
commit f081cc3e1236f6acc8b40bc6bf9ad018019c8f2d
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Sep 30 13:53:36 2021 +0800
Complete circuit break instance by DistSQL (#12841)
---
.../cluster/coordinator/ClusterInstance.java | 11 +++++
.../cluster/coordinator/RegistryCenter.java | 4 +-
.../event/ComputeNodeStatusChangedEvent.java} | 39 ++++------------
.../subscriber/ComputeNodeStatusSubscriber.java | 54 ++++++++++++++++++++++
.../cluster/coordinator/ClusterInstanceTest.java | 6 +++
.../common/set/SetStatementExecutorFactory.java | 2 +-
.../set/excutor/SetInstanceStatusExecutor.java | 17 +++----
7 files changed, 90 insertions(+), 43 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
index 92b922e..0841c6f 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
@@ -47,6 +47,17 @@ public final class ClusterInstance {
}
/**
+ * Get instance id.
+ *
+ * @param ip ip
+ * @param port port
+ * @return instance id
+ */
+ public String getInstanceId(final String ip, final String port) {
+ return String.join(DELIMITER, ip, port);
+ }
+
+ /**
* Get instance.
*
* @return singleton instance
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 2644394..220c17b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -24,8 +24,9 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.subscriber.GlobalRuleRegistrySubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.SchemaMetaDataRegistrySubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessRegistrySubscriber;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -61,6 +62,7 @@ public final class RegistryCenter {
private void createSubscribers(final ClusterPersistRepository repository) {
new SchemaMetaDataRegistrySubscriber(repository);
new GlobalRuleRegistrySubscriber(repository);
+ new ComputeNodeStatusSubscriber(repository);
new StorageNodeStatusSubscriber(repository);
new ScalingRegistrySubscriber(repository);
new ProcessRegistrySubscriber(repository);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
similarity index 53%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
index 92b922e..2e6f278 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
@@ -15,43 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
-import lombok.AccessLevel;
import lombok.Getter;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.utils.IpUtils;
-
-import java.lang.management.ManagementFactory;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
/**
- * Cluster instance.
+ * Compute node status changed event.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
@Getter
-public final class ClusterInstance {
-
- private static final String DELIMITER = "@";
-
- private static final ClusterInstance INSTANCE = new ClusterInstance();
+public final class ComputeNodeStatusChangedEvent {
- private volatile String id;
+ private final ComputeNodeStatus status;
- /**
- * Init cluster instance.
- *
- * @param port port
- */
- public synchronized void init(final Integer port) {
- id = String.join(DELIMITER, IpUtils.getIp(), null == port ? ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] : String.valueOf(port));
- }
+ private final String ip;
- /**
- * Get instance.
- *
- * @return singleton instance
- */
- public static ClusterInstance getInstance() {
- return INSTANCE;
- }
+ private final String port;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatus [...]
new file mode 100644
index 0000000..b436bf0
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterInstance;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Compute node status subscriber.
+ */
+public final class ComputeNodeStatusSubscriber {
+
+ private final ClusterPersistRepository repository;
+
+ public ComputeNodeStatusSubscriber(final ClusterPersistRepository repository) {
+ this.repository = repository;
+ ShardingSphereEventBus.getInstance().register(this);
+ }
+
+ /**
+ * Update compute node status.
+ *
+ * @param event compute node status changed event
+ */
+ @Subscribe
+ public void update(final ComputeNodeStatusChangedEvent event) {
+ String computeNodePath = ComputeStatusNode.getStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER, ClusterInstance.getInstance().getInstanceId(event.getIp(), event.getPort()));
+ if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAKER) {
+ repository.persist(computeNodePath, "");
+ } else {
+ repository.delete(computeNodePath);
+ }
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java
index 789e7f7..eac78b2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java
@@ -46,4 +46,10 @@ public final class ClusterInstanceTest {
assertThat(id.split("@").length, is(2));
assertThat(id, is(Joiner.on("@").join(ip, pid)));
}
+
+ @Test
+ public void assertGetInstanceId() {
+ ClusterInstance.getInstance().init(null);
+ assertThat(ClusterInstance.getInstance().getInstanceId("127.0.0.1", "3307"), is("127.0.0.1@3307"));
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java
index 1d576b9..c662088 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java
@@ -50,7 +50,7 @@ public final class SetStatementExecutorFactory {
return new SetReadwriteSplittingStatusExecutor((SetReadwriteSplittingStatusStatement) sqlStatement, backendConnection);
}
if (sqlStatement instanceof SetInstanceStatusStatement) {
- return new SetInstanceStatusExecutor((SetInstanceStatusStatement) sqlStatement, backendConnection);
+ return new SetInstanceStatusExecutor((SetInstanceStatusStatement) sqlStatement);
}
throw new UnsupportedTypeException(sqlStatement.getClass().getCanonicalName());
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java
index 715d43c..81e6cdd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java
@@ -20,15 +20,13 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.excu
import lombok.AllArgsConstructor;
import org.apache.shardingsphere.distsql.parser.statement.ral.common.status.SetInstanceStatusStatement;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.SetStatementExecutor;
-import java.util.Optional;
-
/**
* Set instance status executor.
*/
@@ -37,14 +35,11 @@ public final class SetInstanceStatusExecutor implements SetStatementExecutor {
private final SetInstanceStatusStatement sqlStatement;
- private final BackendConnection backendConnection;
-
@Override
public ResponseHeader execute() throws DistSQLException {
- String ip = sqlStatement.getIp();
- String port = sqlStatement.getPort();
- Optional<MetaDataPersistService> persistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
- //TODO Need to circuit breaker API support.
+ // add more instance check here
+ ShardingSphereEventBus.getInstance().post(new ComputeNodeStatusChangedEvent("DISABLE".equals(sqlStatement.getStatus()) ? ComputeNodeStatus.CIRCUIT_BREAKER : ComputeNodeStatus.ONLINE,
+ sqlStatement.getIp(), sqlStatement.getPort()));
return new UpdateResponseHeader(sqlStatement);
}
}