You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/01/06 06:44:40 UTC
[shardingsphere] branch master updated: Add load all compute node instances for DistSQL (#14563)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4c7d608 Add load all compute node instances for DistSQL (#14563)
4c7d608 is described below
commit 4c7d608d7fc16957d649f823e40435657565dc70
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Jan 6 14:43:49 2022 +0800
Add load all compute node instances for DistSQL (#14563)
* Add load all compute node instances for DistSQL
* Add load all compute node instances for DistSQL
* Add load all compute node instances for DistSQL
---
.../infra/instance/ComputeNodeInstance.java | 6 ++--
.../infra/instance/InstanceDefinition.java | 5 +++
.../shardingsphere/infra/instance/InstanceId.java | 9 +++++
.../traffic/engine/TrafficEngine.java | 2 +-
.../metadata/persist/MetaDataPersistService.java | 9 +++++
.../mode/metadata/persist/node/ComputeNode.java | 2 +-
.../persist/service/ComputeNodePersistService.java | 38 ++++++++++++++++++++--
.../common/show/executor/ShowInstanceExecutor.java | 25 +++++++-------
8 files changed, 75 insertions(+), 21 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 036b3c6..6a84fe4 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -30,11 +30,11 @@ import java.util.Collection;
@Setter
public final class ComputeNodeInstance {
- private String ip;
-
- private String port;
+ private InstanceDefinition instanceDefinition;
private Collection<ShardingSphereUser> users;
private Collection<String> labels;
+
+ private Collection<String> status;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java
index 4569633..541c61e 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceDefinition.java
@@ -40,4 +40,9 @@ public final class InstanceDefinition {
this.instanceType = instanceType;
instanceId = new InstanceId(port);
}
+
+ public InstanceDefinition(final InstanceType instanceType, final String id) {
+ this.instanceType = instanceType;
+ instanceId = new InstanceId(id);
+ }
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java
index 21bc69d..733e7e9 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceId.java
@@ -17,10 +17,12 @@
package org.apache.shardingsphere.infra.instance;
+import com.google.common.base.Splitter;
import lombok.Getter;
import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import java.lang.management.ManagementFactory;
+import java.util.List;
/**
* Instance id.
@@ -48,6 +50,13 @@ public final class InstanceId {
id = String.join(DELIMITER, ip, String.valueOf(port));
}
+ public InstanceId(final String id) {
+ this.id = id;
+ List<String> ids = Splitter.on("@").splitToList(id);
+ ip = ids.get(0);
+ port = Integer.valueOf(ids.get(1));
+ }
+
public InstanceId() {
port = 0;
ip = IpUtils.getIp();
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 7226ae9..20a4160 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -66,7 +66,7 @@ public final class TrafficEngine {
List<String> result = new ArrayList<>();
if (metaDataContexts.getMetaDataPersistService().isPresent()) {
for (ComputeNodeInstance each : metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY, labels)) {
- result.add(each.getIp() + "@" + each.getPort());
+ result.add(each.getInstanceDefinition().getInstanceId().getId());
}
}
return result;
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index a964642..69f6345 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -117,4 +117,13 @@ public final class MetaDataPersistService {
}
return result;
}
+
+ /**
+ * Load all compute node instances.
+ *
+ * @return collection of compute node instance
+ */
+ public Collection<ComputeNodeInstance> loadComputeNodeInstances() {
+ return computeNodePersistService.loadAllComputeNodeInstances();
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 1ead3ee..ad2db60 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -61,7 +61,7 @@ public final class ComputeNode {
}
/**
- * Get online compute node instance label path.
+ * Get compute node instance label path.
*
* @param instanceId instance id
* @return path of compute node instance label
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 030eb42..cb1a58e 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -17,16 +17,17 @@
package org.apache.shardingsphere.mode.metadata.persist.service;
-import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.InstanceType;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -60,6 +61,17 @@ public final class ComputeNodePersistService {
}
/**
+ * Load instance status.
+ *
+ * @param instanceId instance id
+ * @return collection of status
+ */
+ public Collection<String> loadInstanceStatus(final String instanceId) {
+ String yamlContent = repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
+ return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : YamlEngine.unmarshal(yamlContent, Collection.class);
+ }
+
+ /**
* Load all compute node instances by instance type.
*
* @param instanceType instance type
@@ -70,11 +82,31 @@ public final class ComputeNodePersistService {
List<ComputeNodeInstance> result = new ArrayList<>(onlineComputeNodes.size());
onlineComputeNodes.forEach(each -> {
ComputeNodeInstance instance = new ComputeNodeInstance();
- instance.setIp(Splitter.on("@").splitToList(each).get(0));
- instance.setPort(Splitter.on("@").splitToList(each).get(1));
+ instance.setInstanceDefinition(new InstanceDefinition(instanceType, each));
instance.setLabels(loadInstanceLabels(each));
+ instance.setStatus(loadInstanceStatus(each));
result.add(instance);
});
return result;
}
+
+ /**
+ * Load all compute node instances.
+ *
+ * @return collection of compute node instance
+ */
+ public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
+ Collection<ComputeNodeInstance> result = new ArrayList<>();
+ Arrays.stream(InstanceType.values()).forEach(instanceType -> {
+ Collection<String> onlineComputeNodes = repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
+ onlineComputeNodes.forEach(each -> {
+ ComputeNodeInstance instance = new ComputeNodeInstance();
+ instance.setInstanceDefinition(new InstanceDefinition(instanceType, each));
+ instance.setLabels(loadInstanceLabels(each));
+ instance.setStatus(loadInstanceStatus(each));
+ result.add(instance);
+ });
+ });
+ return result;
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
index a7ee2e5..a33c4c9 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.show.executor;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
-import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
@@ -71,12 +71,7 @@ public final class ShowInstanceExecutor extends AbstractShowExecutor {
if (null == persistService || null == persistService.getRepository()) {
return new MultipleLocalDataMergedResult(buildInstanceRows());
}
- Collection<List<Object>> rows = buildInstanceRows(persistService, ENABLE);
- Collection<List<Object>> disableInstanceIds = buildInstanceRows(persistService, DISABLE);
- if (!disableInstanceIds.isEmpty()) {
- rows.addAll(disableInstanceIds);
- }
- return new MultipleLocalDataMergedResult(rows);
+ return new MultipleLocalDataMergedResult(buildInstanceRows(persistService));
}
private Collection<List<Object>> buildInstanceRows() {
@@ -87,11 +82,11 @@ public final class ShowInstanceExecutor extends AbstractShowExecutor {
return rows;
}
- private Collection<List<Object>> buildInstanceRows(final MetaDataPersistService persistService, final String status) {
- String statusPath = ComputeStatusNode.getStatusPath(status.equals(ENABLE) ? ComputeNodeStatus.ONLINE : ComputeNodeStatus.CIRCUIT_BREAKER);
- List<String> instanceIds = persistService.getRepository().getChildrenKeys(statusPath);
- if (!instanceIds.isEmpty()) {
- return instanceIds.stream().filter(Objects::nonNull).map(each -> buildRow(each, status)).collect(Collectors.toCollection(LinkedList::new));
+ private Collection<List<Object>> buildInstanceRows(final MetaDataPersistService persistService) {
+ Collection<ComputeNodeInstance> instances = persistService.loadComputeNodeInstances();
+ if (!instances.isEmpty()) {
+ return instances.stream().filter(Objects::nonNull).map(each -> buildRow(each.getInstanceDefinition().getInstanceId().getId(), getStatus(each.getStatus())))
+ .collect(Collectors.toCollection(LinkedList::new));
}
return Collections.emptyList();
}
@@ -102,4 +97,8 @@ public final class ShowInstanceExecutor extends AbstractShowExecutor {
String port = splitInstanceId.length < 2 ? "" : splitInstanceId[1];
return Stream.of(instanceId, host, port, status).map(each -> (Object) each).collect(Collectors.toCollection(LinkedList::new));
}
+
+ private String getStatus(final Collection<String> computeNodeStatus) {
+ return computeNodeStatus.isEmpty() || !computeNodeStatus.contains(ComputeNodeStatus.CIRCUIT_BREAKER.name()) ? ENABLE : DISABLE;
+ }
}