You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/01/06 06:44:40 UTC

[shardingsphere] branch master updated: Add load all compute node instances for DistSQL (#14563)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c7d608  Add load all compute node instances for DistSQL (#14563)
4c7d608 is described below

commit 4c7d608d7fc16957d649f823e40435657565dc70
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Jan 6 14:43:49 2022 +0800

    Add load all compute node instances for DistSQL (#14563)
    
    * Add load all compute node instances for DistSQL
    
    * Add load all compute node instances for DistSQL
    
    * Add load all compute node instances for DistSQL
---
 .../infra/instance/ComputeNodeInstance.java        |  6 ++--
 .../infra/instance/InstanceDefinition.java         |  5 +++
 .../shardingsphere/infra/instance/InstanceId.java  |  9 +++++
 .../traffic/engine/TrafficEngine.java              |  2 +-
 .../metadata/persist/MetaDataPersistService.java   |  9 +++++
 .../mode/metadata/persist/node/ComputeNode.java    |  2 +-
 .../persist/service/ComputeNodePersistService.java | 38 ++++++++++++++++++++--
 .../common/show/executor/ShowInstanceExecutor.java | 25 +++++++-------
 8 files changed, 75 insertions(+), 21 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 036b3c6..6a84fe4 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -30,11 +30,11 @@ import java.util.Collection;
 @Setter
 public final class ComputeNodeInstance {
     
-    private String ip;
-    
-    private String port;
+    private InstanceDefinition instanceDefinition;
     
     private Collection<ShardingSphereUser> users;
     
     private Collection<String> labels;
+    
+    private Collection<String> status;
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java
index 4569633..541c61e 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java
@@ -40,4 +40,9 @@ public final class InstanceDefinition {
         this.instanceType = instanceType;
         instanceId = new InstanceId(port);
     }
+    
+    public InstanceDefinition(final InstanceType instanceType, final String id) {
+        this.instanceType = instanceType;
+        instanceId = new InstanceId(id);
+    }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java
index 21bc69d..733e7e9 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java
@@ -17,10 +17,12 @@
 
 package org.apache.shardingsphere.infra.instance;
 
+import com.google.common.base.Splitter;
 import lombok.Getter;
 import org.apache.shardingsphere.infra.instance.utils.IpUtils;
 
 import java.lang.management.ManagementFactory;
+import java.util.List;
 
 /**
  * Instance id.
@@ -48,6 +50,13 @@ public final class InstanceId {
         id = String.join(DELIMITER, ip, String.valueOf(port));
     }
     
+    public InstanceId(final String id) {
+        this.id = id;
+        List<String> ids = Splitter.on("@").splitToList(id);
+        ip = ids.get(0);
+        port = Integer.valueOf(ids.get(1));
+    }
+    
     public InstanceId() {
         port = 0;
         ip = IpUtils.getIp();
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 7226ae9..20a4160 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -66,7 +66,7 @@ public final class TrafficEngine {
         List<String> result = new ArrayList<>();
         if (metaDataContexts.getMetaDataPersistService().isPresent()) {
             for (ComputeNodeInstance each : metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY, labels)) {
-                result.add(each.getIp() + "@" + each.getPort());
+                result.add(each.getInstanceDefinition().getInstanceId().getId());
             }
         }
         return result;
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index a964642..69f6345 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -117,4 +117,13 @@ public final class MetaDataPersistService {
         }
         return result;
     }
+    
+    /**
+     * Load all compute node instances.
+     *
+     * @return collection of compute node instance
+     */
+    public Collection<ComputeNodeInstance> loadComputeNodeInstances() {
+        return computeNodePersistService.loadAllComputeNodeInstances();
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 1ead3ee..ad2db60 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -61,7 +61,7 @@ public final class ComputeNode {
     }
     
     /**
-     * Get online compute node instance label path.
+     * Get compute node instance label path.
      *
      * @param instanceId instance id
      * @return path of compute node instance label
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 030eb42..cb1a58e 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -17,16 +17,17 @@
 
 package org.apache.shardingsphere.mode.metadata.persist.service;
 
-import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceDefinition;
 import org.apache.shardingsphere.infra.instance.InstanceType;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
@@ -60,6 +61,17 @@ public final class ComputeNodePersistService {
     }
     
     /**
+     * Load instance status.
+     * 
+     * @param instanceId instance id
+     * @return collection of status
+     */
+    public Collection<String> loadInstanceStatus(final String instanceId) {
+        String yamlContent = repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
+        return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
+    }
+    
+    /**
      * Load all compute node instances by instance type.
      * 
      * @param instanceType instance type     
@@ -70,11 +82,31 @@ public final class ComputeNodePersistService {
         List<ComputeNodeInstance> result = new ArrayList<>(onlineComputeNodes.size());
         onlineComputeNodes.forEach(each -> {
             ComputeNodeInstance instance = new ComputeNodeInstance();
-            instance.setIp(Splitter.on("@").splitToList(each).get(0));
-            instance.setPort(Splitter.on("@").splitToList(each).get(1));
+            instance.setInstanceDefinition(new InstanceDefinition(instanceType, each));
             instance.setLabels(loadInstanceLabels(each));
+            instance.setStatus(loadInstanceStatus(each));
             result.add(instance);
         });
         return result;
     }
+    
+    /**
+     * Load all compute node instances.
+     *
+     * @return collection of compute node instance
+     */
+    public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
+        Collection<ComputeNodeInstance> result = new ArrayList<>();
+        Arrays.stream(InstanceType.values()).forEach(instanceType -> {
+            Collection<String> onlineComputeNodes = repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
+            onlineComputeNodes.forEach(each -> {
+                ComputeNodeInstance instance = new ComputeNodeInstance();
+                instance.setInstanceDefinition(new InstanceDefinition(instanceType, each));
+                instance.setLabels(loadInstanceLabels(each));
+                instance.setStatus(loadInstanceStatus(each));
+                result.add(instance);
+            });
+        });
+        return result;
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
index a7ee2e5..a33c4c9 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
@@ -17,10 +17,10 @@
 
 package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.show.executor;
 
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
-import org.apache.shardingsphere.infra.instance.utils.IpUtils;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
@@ -71,12 +71,7 @@ public final class ShowInstanceExecutor extends AbstractShowExecutor {
         if (null == persistService || null == persistService.getRepository()) {
             return new MultipleLocalDataMergedResult(buildInstanceRows());
         }
-        Collection<List<Object>> rows = buildInstanceRows(persistService, ENABLE);
-        Collection<List<Object>> disableInstanceIds = buildInstanceRows(persistService, DISABLE);
-        if (!disableInstanceIds.isEmpty()) {
-            rows.addAll(disableInstanceIds);
-        }
-        return new MultipleLocalDataMergedResult(rows);
+        return new MultipleLocalDataMergedResult(buildInstanceRows(persistService));
     }
     
     private Collection<List<Object>> buildInstanceRows() {
@@ -87,11 +82,11 @@ public final class ShowInstanceExecutor extends AbstractShowExecutor {
         return rows;
     }
     
-    private Collection<List<Object>> buildInstanceRows(final MetaDataPersistService persistService, final String status) {
-        String statusPath = ComputeStatusNode.getStatusPath(status.equals(ENABLE) ? ComputeNodeStatus.ONLINE : ComputeNodeStatus.CIRCUIT_BREAKER);
-        List<String> instanceIds = persistService.getRepository().getChildrenKeys(statusPath);
-        if (!instanceIds.isEmpty()) {
-            return instanceIds.stream().filter(Objects::nonNull).map(each -> buildRow(each, status)).collect(Collectors.toCollection(LinkedList::new));
+    private Collection<List<Object>> buildInstanceRows(final MetaDataPersistService persistService) {
+        Collection<ComputeNodeInstance> instances = persistService.loadComputeNodeInstances();
+        if (!instances.isEmpty()) {
+            return instances.stream().filter(Objects::nonNull).map(each -> buildRow(each.getInstanceDefinition().getInstanceId().getId(), getStatus(each.getStatus())))
+                    .collect(Collectors.toCollection(LinkedList::new));
         }
         return Collections.emptyList();
     }
@@ -102,4 +97,8 @@ public final class ShowInstanceExecutor extends AbstractShowExecutor {
         String port = splitInstanceId.length < 2 ? "" : splitInstanceId[1];
         return Stream.of(instanceId, host, port, status).map(each -> (Object) each).collect(Collectors.toCollection(LinkedList::new));
     }
+    
+    private String getStatus(final Collection<String> computeNodeStatus) {
+        return computeNodeStatus.isEmpty() || !computeNodeStatus.contains(ComputeNodeStatus.CIRCUIT_BREAKER.name()) ? ENABLE : DISABLE;
+    }
 }