You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/30 05:54:07 UTC

[shardingsphere] branch master updated: Complete circuit break instance by DistSQL (#12841)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f081cc3  Complete circuit break instance by DistSQL (#12841)
f081cc3 is described below

commit f081cc3e1236f6acc8b40bc6bf9ad018019c8f2d
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Sep 30 13:53:36 2021 +0800

    Complete circuit break instance by DistSQL (#12841)
---
 .../cluster/coordinator/ClusterInstance.java       | 11 +++++
 .../cluster/coordinator/RegistryCenter.java        |  4 +-
 .../event/ComputeNodeStatusChangedEvent.java}      | 39 ++++------------
 .../subscriber/ComputeNodeStatusSubscriber.java    | 54 ++++++++++++++++++++++
 .../cluster/coordinator/ClusterInstanceTest.java   |  6 +++
 .../common/set/SetStatementExecutorFactory.java    |  2 +-
 .../set/excutor/SetInstanceStatusExecutor.java     | 17 +++----
 7 files changed, 90 insertions(+), 43 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
index 92b922e..0841c6f 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
@@ -47,6 +47,17 @@ public final class ClusterInstance {
     }
     
     /**
+     * Get instance id.
+     * 
+     * @param ip ip
+     * @param port port
+     * @return instance id
+     */
+    public String getInstanceId(final String ip, final String port) {
+        return String.join(DELIMITER, ip, port);
+    }
+    
+    /**
      * Get instance.
      *
      * @return singleton instance
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 2644394..220c17b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -24,8 +24,9 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.subscriber.GlobalRuleRegistrySubscriber;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.SchemaMetaDataRegistrySubscriber;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessRegistrySubscriber;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
@@ -61,6 +62,7 @@ public final class RegistryCenter {
     private void createSubscribers(final ClusterPersistRepository repository) {
         new SchemaMetaDataRegistrySubscriber(repository);
         new GlobalRuleRegistrySubscriber(repository);
+        new ComputeNodeStatusSubscriber(repository);
         new StorageNodeStatusSubscriber(repository);
         new ScalingRegistrySubscriber(repository);
         new ProcessRegistrySubscriber(repository);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
similarity index 53%
copy from shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
copy to shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
index 92b922e..2e6f278 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstance.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
@@ -15,43 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
 
-import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.utils.IpUtils;
-
-import java.lang.management.ManagementFactory;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 
 /**
- * Cluster instance.
+ * Compute node status changed event.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
 @Getter
-public final class ClusterInstance {
-    
-    private static final String DELIMITER = "@";
-    
-    private static final ClusterInstance INSTANCE = new ClusterInstance();
+public final class ComputeNodeStatusChangedEvent {
     
-    private volatile String id;
+    private final ComputeNodeStatus status;
     
-    /**
-     * Init cluster instance.
-     * 
-     * @param port port
-     */
-    public synchronized void init(final Integer port) {
-        id = String.join(DELIMITER, IpUtils.getIp(), null == port ? ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] : String.valueOf(port));
-    }
+    private final String ip;
     
-    /**
-     * Get instance.
-     *
-     * @return singleton instance
-     */
-    public static ClusterInstance getInstance() {
-        return INSTANCE;
-    }
+    private final String port;
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatus [...]
new file mode 100644
index 0000000..b436bf0
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterInstance;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Compute node status subscriber.
+ */
+public final class ComputeNodeStatusSubscriber {
+    
+    private final ClusterPersistRepository repository;
+    
+    public ComputeNodeStatusSubscriber(final ClusterPersistRepository repository) {
+        this.repository = repository;
+        ShardingSphereEventBus.getInstance().register(this);
+    }
+    
+    /**
+     * Update compute node status.
+     *
+     * @param event compute node status changed event
+     */
+    @Subscribe
+    public void update(final ComputeNodeStatusChangedEvent event) {
+        String computeNodePath = ComputeStatusNode.getStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER, ClusterInstance.getInstance().getInstanceId(event.getIp(), event.getPort()));
+        if (event.getStatus() == ComputeNodeStatus.CIRCUIT_BREAKER) {
+            repository.persist(computeNodePath, "");
+        } else {
+            repository.delete(computeNodePath);
+        }
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java
index 789e7f7..eac78b2 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterInstanceTest.java
@@ -46,4 +46,10 @@ public final class ClusterInstanceTest {
         assertThat(id.split("@").length, is(2));
         assertThat(id, is(Joiner.on("@").join(ip, pid)));
     }
+    
+    @Test
+    public void assertGetInstanceId() {
+        ClusterInstance.getInstance().init(null);
+        assertThat(ClusterInstance.getInstance().getInstanceId("127.0.0.1", "3307"), is("127.0.0.1@3307"));
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java
index 1d576b9..c662088 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/SetStatementExecutorFactory.java
@@ -50,7 +50,7 @@ public final class SetStatementExecutorFactory {
             return new SetReadwriteSplittingStatusExecutor((SetReadwriteSplittingStatusStatement) sqlStatement, backendConnection);
         }
         if (sqlStatement instanceof SetInstanceStatusStatement) {
-            return new SetInstanceStatusExecutor((SetInstanceStatusStatement) sqlStatement, backendConnection);
+            return new SetInstanceStatusExecutor((SetInstanceStatusStatement) sqlStatement);
         }
         throw new UnsupportedTypeException(sqlStatement.getClass().getCanonicalName());
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java
index 715d43c..81e6cdd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/set/excutor/SetInstanceStatusExecutor.java
@@ -20,15 +20,13 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.excu
 import lombok.AllArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.statement.ral.common.status.SetInstanceStatusStatement;
 import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.set.SetStatementExecutor;
 
-import java.util.Optional;
-
 /**
  * Set instance status executor.
  */
@@ -37,14 +35,11 @@ public final class SetInstanceStatusExecutor implements SetStatementExecutor {
     
     private final SetInstanceStatusStatement sqlStatement;
     
-    private final BackendConnection backendConnection;
-    
     @Override
     public ResponseHeader execute() throws DistSQLException {
-        String ip = sqlStatement.getIp();
-        String port = sqlStatement.getPort();
-        Optional<MetaDataPersistService> persistService = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
-        //TODO Need to circuit breaker API support.
+        // add more instance check here
+        ShardingSphereEventBus.getInstance().post(new ComputeNodeStatusChangedEvent("DISABLE".equals(sqlStatement.getStatus()) ? ComputeNodeStatus.CIRCUIT_BREAKER : ComputeNodeStatus.ONLINE, 
+                sqlStatement.getIp(), sqlStatement.getPort()));
         return new UpdateResponseHeader(sqlStatement);
     }
 }