You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/11/18 19:04:25 UTC
[helix] 16/18: Add TrieClusterTopology for retrieving hierarchical
topology (#1307)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git
commit bdcb37a1947b4929647a0b8dd0e3d16640bc9d9f
Author: Meng Zhang <mn...@linkedin.com>
AuthorDate: Thu Sep 24 11:38:51 2020 -0700
Add TrieClusterTopology for retrieving hierarchical topology (#1307)
Add TrieNode class to define a node in the trie
Add ClusterTrie class to handle the construction, validation and retrieval of nodes/paths in the trie.
Add ClusterTopology class to provide different APIs for users to retrieve cluster topology information.
Add APIs in HelixAdmin to retrieve ClusterTopology of a specific cluster.
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 7 +
.../apache/helix/api/topology/ClusterTopology.java | 192 +++++++++++++++++
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 20 ++
.../java/org/apache/helix/model/ClusterTrie.java | 227 +++++++++++++++++++++
.../main/java/org/apache/helix/model/TrieNode.java | 56 +++++
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 76 ++++++-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 6 +
.../org/apache/helix/model/TestClusterTrie.java | 141 +++++++++++++
8 files changed, 724 insertions(+), 1 deletion(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index bb5f3bf..42bcb24 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -22,6 +22,7 @@ package org.apache.helix;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -394,6 +395,12 @@ public interface HelixAdmin {
void removeCloudConfig(String clusterName);
/**
+ * Get the topology of a specific cluster
+ * @param clusterName
+ */
+ ClusterTopology getClusterTopology(String clusterName);
+
+ /**
* Get a list of state model definitions in a cluster
* @param clusterName
* @return
diff --git a/helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java b/helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
new file mode 100644
index 0000000..72bc594
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
@@ -0,0 +1,192 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+ private static Logger logger = LoggerFactory.getLogger(ClusterTopology.class);
+
+ private final ClusterTrie _trieClusterTopology;
+
+ public ClusterTopology(final List<String> liveNodes,
+ final Map<String, InstanceConfig> instanceConfigMap, final ClusterConfig clusterConfig) {
+ _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, clusterConfig);
+ }
+
+ /**
+ * Return the whole topology of a cluster as a map. The key of the map is the first level of
+ * domain, and the value is a list of string that represents the path to each end node in that
+ * domain. E.g., assume the topology is defined as /group/zone/rack/host, the result may be {
+ * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], ["/group:1": {"/zone:1
+ * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+ */
+ public Map<String, List<String>> getTopologyMap() {
+ return getTopologyUnderDomain(Collections.emptyMap());
+ }
+
+ /**
+ * Return all the instances under fault zone type. The key of the returned map is each fault
+ * zone name, and the value is a list of string that represents the path to each end node in
+ * that fault zone.
+ * @return , e.g. if the fault zone is "zone", it may return {["/group:0/zone:0": {"rack:0/host
+ * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", "/rack:1/host:3"}]}
+ */
+ public Map<String, List<String>> getFaultZoneMap() {
+ String faultZone = _trieClusterTopology.getFaultZoneType();
+ if (faultZone == null) {
+ throw new IllegalArgumentException("The fault zone in cluster config is not defined");
+ }
+ return getTopologyUnderDomainType(faultZone);
+ }
+
+ /**
+ * Return the instances whose domain field is not valid
+ */
+ public List<String> getInvalidInstances() {
+ return _trieClusterTopology.getInvalidInstances();
+ }
+
+ /**
+ * Return the topology under a certain domain as a map. The key of the returned map is the next
+ * level domain, and the value is a list of string that represents the path to each end node in
+ * that domain.
+ * @param domainMap A map defining the domain name and its value, e.g. {["group": "1"], ["zone",
+ * "2"]}
+ * @return the topology under the given domain, e.g. {["/group:1/zone:2/rack:0": {"/host:0",
+ * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+ */
+ private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> domainMap) {
+ LinkedHashMap<String, String> orderedDomain = validateAndOrderDomain(domainMap);
+ TrieNode startNode = _trieClusterTopology.getNode(orderedDomain);
+ Map<String, TrieNode> children = startNode.getChildren();
+ Map<String, List<String>> results = new HashMap<>();
+ children.entrySet().forEach(child -> {
+ results.put(startNode.getPath() + DELIMITER + child.getKey(),
+ truncatePath(_trieClusterTopology.getPathUnderNode(child.getValue()),
+ child.getValue().getPath()));
+ });
+ return results;
+ }
+
+ /**
+ * Return the full topology of a certain domain type.
+ * @param domainType a specific type of domain, e.g. zone
+ * @return the topology of the given domain type, e.g. {["/group:0/zone:0": {"rack:0/host:0",
+ * "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", "/rack:1/host:3"}]}
+ */
+ private Map<String, List<String>> getTopologyUnderDomainType(String domainType) {
+ String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+ if (domainType.equals(topologyKeys[0])) {
+ return getTopologyMap();
+ }
+ Map<String, List<String>> results = new HashMap<>();
+ String parentDomainType = null;
+ for (int i = 1; i < topologyKeys.length; i++) {
+ if (topologyKeys[i].equals(domainType)) {
+ parentDomainType = topologyKeys[i - 1];
+ break;
+ }
+ }
+ // get all the starting nodes for the domain type
+ List<TrieNode> startNodes = _trieClusterTopology.getStartNodes(parentDomainType);
+ for (TrieNode startNode : startNodes) {
+ results.putAll(getTopologyUnderPath(startNode.getPath()));
+ }
+ return results;
+ }
+
+ /**
+ * Return the topology under a certain path as a map. The key of the returned map is the next
+ * level domain, and the value is a list of string that represents the path to each end node in
+ * that domain.
+ * @param path a path to a certain Trie node, e.g. /group:1/zone:2
+ * @return the topology under the given domain, e.g. {["/group:1/zone:2/rack:0": {"/host:0",
+ * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+ */
+ private Map<String, List<String>> getTopologyUnderPath(String path) {
+ Map<String, String> domain = convertPathToDomain(path);
+ return getTopologyUnderDomain(domain);
+ }
+
+ /**
+ * Validate the domain provided has continuous fields in cluster topology definition. If it
+ * has, order the domain based on cluster topology definition. E.g. if the cluster topology is
+ * /group/zone/rack/instance, and domain is provided as {["zone": "1"], ["group", "2"]} will be
+ * reordered in a LinkedinHashMap as {["group", "2"], ["zone": "1"]}
+ */
+ private LinkedHashMap<String, String> validateAndOrderDomain(Map<String, String> domainMap) {
+ LinkedHashMap<String, String> orderedDomain = new LinkedHashMap<>();
+ if (domainMap == null) {
+ throw new IllegalArgumentException("The domain should not be null");
+ }
+ String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+ for (int i = 0; i < domainMap.size(); i++) {
+ if (!domainMap.containsKey(topologyKeys[i])) {
+ throw new IllegalArgumentException(String
+ .format("The input domain is not valid, the key %s is required", topologyKeys[i]));
+ } else {
+ orderedDomain.put(topologyKeys[i], domainMap.get(topologyKeys[i]));
+ }
+ }
+ return orderedDomain;
+ }
+
+ /**
+ * Truncate each path in the given set and only retain path starting from current node's
+ * children to each end node.
+ * @param toRemovePath The path from root to current node. It should be removed so that users
+ * can get a better view.
+ */
+ private List<String> truncatePath(Set<String> paths, String toRemovePath) {
+ List<String> results = new ArrayList<>();
+ paths.forEach(path -> {
+ String truncatedPath = path.replace(toRemovePath, "");
+ results.add(truncatedPath);
+ });
+ return results;
+ }
+
+ private Map<String, String> convertPathToDomain(String path) {
+ Map<String, String> results = new HashMap<>();
+ for (String part : path.substring(1).split(DELIMITER)) {
+ results.put(part.substring(0, part.indexOf(CONNECTOR)),
+ part.substring(part.indexOf(CONNECTOR) + 1));
+ }
+ return results;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 8a0d72c..0fdc3b4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -53,6 +53,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -1047,6 +1048,25 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public ClusterTopology getClusterTopology(String clusterName) {
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ String path = PropertyPathBuilder.instanceConfig(clusterName);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
+ List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0, 0, 0);
+ for (ZNRecord record : znRecords) {
+ if (record != null) {
+ InstanceConfig instanceConfig = new InstanceConfig(record);
+ instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig);
+ }
+ }
+ path = PropertyPathBuilder.liveInstance(clusterName);
+ List<String> liveNodes = baseAccessor.getChildNames(path, 0);
+ ConfigAccessor configAccessor = new ConfigAccessor(_zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ return new ClusterTopology(liveNodes, instanceConfigMap, clusterConfig);
+ }
+
+ @Override
public List<String> getStateModelDefs(String clusterName) {
return _zkClient.getChildren(PropertyPathBuilder.stateModelDef(clusterName));
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java b/helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
new file mode 100644
index 0000000..f1f91a4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
@@ -0,0 +1,227 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster topology. Each node
+ * except the terminal node represents a certain domain in the topology, and an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+ public static final String DELIMITER = "/";
+ public static final String CONNECTOR = ":";
+
+ private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+ private TrieNode _rootNode;
+ private String[] _topologyKeys;
+ private String _faultZoneType;
+ private List<String> _invalidInstances = new ArrayList<>();
+
+ public ClusterTrie(final List<String> liveNodes,
+ final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+ validateInstanceConfig(liveNodes, instanceConfigMap);
+ _topologyKeys = getTopologyDef(clusterConfig);
+ _faultZoneType = clusterConfig.getFaultZoneType();
+ _invalidInstances = getInvalidInstancesFromConfig(instanceConfigMap, _topologyKeys);
+ instanceConfigMap.keySet().removeAll(_invalidInstances);
+ _rootNode = constructTrie(instanceConfigMap, _topologyKeys);
+ }
+
+ public TrieNode getRootNode() {
+ return _rootNode;
+ }
+
+ public String[] getTopologyKeys() {
+ return _topologyKeys;
+ }
+
+ public String getFaultZoneType() {
+ return _faultZoneType;
+ }
+
+ public List<String> getInvalidInstances() {
+ return _invalidInstances;
+ }
+
+ /**
+ * Return all the paths from a TrieNode as a set.
+ * @param node the node from where to collect all the nodes' paths.
+ * @return All the paths under the node.
+ */
+ public Set<String> getPathUnderNode(TrieNode node) {
+ Set<String> resultMap = new HashSet<>();
+ Deque<TrieNode> nodeStack = new ArrayDeque<>();
+ nodeStack.push(node);
+ while (!nodeStack.isEmpty()) {
+ node = nodeStack.pop();
+ if (node.getChildren().isEmpty()) {
+ resultMap.add(node.getPath());
+ } else {
+ for (TrieNode child : node.getChildren().values()) {
+ nodeStack.push(child);
+ }
+ }
+ }
+ return resultMap;
+ }
+
+ /**
+ * Get a specific node in the trie given a map of domain type and its value.
+ * @param domainMap a map of domain type and the corresponding value
+ * @return a trie node
+ */
+ public TrieNode getNode(LinkedHashMap<String, String> domainMap) {
+ TrieNode curNode = _rootNode;
+ TrieNode nextNode;
+ for (Map.Entry<String, String> entry : domainMap.entrySet()) {
+ nextNode = curNode.getChildren().get(entry.getKey() + CONNECTOR + entry.getValue());
+ if (nextNode == null) {
+ throw new IllegalArgumentException(String
+ .format("The input domain %s does not have the value %s", entry.getKey(),
+ entry.getValue()));
+ }
+ curNode = nextNode;
+ }
+ return curNode;
+ }
+
+ /**
+ * Get all the starting nodes for a certain domain type. E.g., if the domainType is "zone", it
+ * will return the list of trie nodes that represent zone:0, zone:1, zone:2, etc.
+ * @param domainType a specific domain type
+ * @return a list of trie nodes
+ */
+ public List<TrieNode> getStartNodes(String domainType) {
+ List<TrieNode> results = new ArrayList<>();
+ TrieNode curNode = _rootNode;
+ Deque<TrieNode> nodeStack = new ArrayDeque<>();
+ nodeStack.push(curNode);
+ while (!nodeStack.isEmpty()) {
+ curNode = nodeStack.pop();
+ if (curNode.getNodeKey().equals(domainType)) {
+ results.add(curNode);
+ } else {
+ for (TrieNode child : curNode.getChildren().values()) {
+ nodeStack.push(child);
+ }
+ }
+ }
+ return results;
+ }
+
+ private void validateInstanceConfig(final List<String> liveNodes,
+ final Map<String, InstanceConfig> instanceConfigMap) {
+ if (instanceConfigMap == null || !instanceConfigMap.keySet().containsAll(liveNodes)) {
+ List<String> liveNodesCopy = new ArrayList<>();
+ liveNodesCopy.addAll(liveNodes);
+ throw new HelixException(String.format("Config for instances %s is not found!",
+ instanceConfigMap == null ? liveNodes
+ : liveNodesCopy.removeAll(instanceConfigMap.keySet())));
+ }
+ }
+
+ private List<String> getInvalidInstancesFromConfig(Map<String, InstanceConfig> instanceConfigMap,
+ final String[] topologyKeys) {
+ List<String> invalidInstances = new ArrayList<>();
+ for (String instanceName : instanceConfigMap.keySet()) {
+ try {
+ Map<String, String> domainAsMap = instanceConfigMap.get(instanceName).getDomainAsMap();
+ for (String key : topologyKeys) {
+ String value = domainAsMap.get(key);
+ if (value == null || value.length() == 0) {
+ logger.info(String.format("Domain %s for instance %s is not set", domainAsMap.get(key),
+ instanceName));
+ invalidInstances.add(instanceName);
+ break;
+ }
+ }
+ } catch (IllegalArgumentException e) {
+ invalidInstances.add(instanceName);
+ }
+ }
+ return invalidInstances;
+ }
+
+ // Note that we do not validate whether topology-aware is enabled or fault zone type is
+ // defined, as they do not block the construction of the trie
+ private String[] getTopologyDef(ClusterConfig clusterConfig) {
+ String[] topologyDef;
+ String topologyDefInConfig = clusterConfig.getTopology();
+ if (topologyDefInConfig == null || !topologyDefInConfig.trim().startsWith(DELIMITER)) {
+ throw new HelixException(String.format("The topology of cluster %s is invalid!",
+ clusterConfig.getClusterName()));
+ }
+ // A list of all keys in cluster topology, e.g., a cluster topology defined as
+ // /group/zone/rack/host will return ["group", "zone", "rack", "host"].
+ topologyDef =
+ Arrays.asList(topologyDefInConfig.split(DELIMITER)).stream().map(str -> str.trim())
+ .filter(str -> !str.isEmpty()).collect(Collectors.toList()).toArray(new String[0]);
+ if (topologyDef.length == 0) {
+ throw new HelixException(String.format("The topology of cluster %s is not correctly defined",
+ clusterConfig.getClusterName()));
+ }
+ return topologyDef;
+ }
+
+ /**
+ * Constructs a trie based on the provided instance config map. It loops through all instance
+ * configs and constructs the trie in a top down manner.
+ */
+ private TrieNode constructTrie(Map<String, InstanceConfig> instanceConfigMap,
+ final String[] topologyKeys) {
+ TrieNode rootNode = new TrieNode("", "ROOT");
+ Map<String, Map<String, String>> instanceDomainsMap = new HashMap<>();
+ instanceConfigMap.entrySet().forEach(
+ entry -> instanceDomainsMap.put(entry.getKey(), entry.getValue().getDomainAsMap()));
+
+ for (Map.Entry<String, Map<String, String>> entry : instanceDomainsMap.entrySet()) {
+ TrieNode curNode = rootNode;
+ StringBuilder path = new StringBuilder();
+ for (int i = 0; i < topologyKeys.length; i++) {
+ String key = topologyKeys[i] + CONNECTOR + entry.getValue().get(topologyKeys[i]);
+ path.append(DELIMITER).append(key);
+ TrieNode nextNode = curNode.getChildren().get(key);
+ if (nextNode == null) {
+ nextNode = new TrieNode(path.toString(), topologyKeys[i]);
+ }
+ curNode.addChild(key, nextNode);
+ curNode = nextNode;
+ }
+ }
+ return rootNode;
+ }
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/model/TrieNode.java b/helix-core/src/main/java/org/apache/helix/model/TrieNode.java
new file mode 100644
index 0000000..e58ae90
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/TrieNode.java
@@ -0,0 +1,56 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class TrieNode {
+ // A mapping between trie key and children nodes.
+ private Map<String, TrieNode> _children;
+
+ // the complete path/prefix leading to the current node.
+ private final String _path;
+
+ private final String _nodeKey;
+
+ TrieNode(String path, String nodeKey) {
+ _path = path;
+ _nodeKey = nodeKey;
+ _children = new HashMap<>();
+ }
+
+ public Map<String, TrieNode> getChildren() {
+ return _children;
+ }
+
+ public String getPath() {
+ return _path;
+ }
+
+ public String getNodeKey() {
+ return _nodeKey;
+ }
+
+ public void addChild(String key, TrieNode node) {
+ _children.put(key, node);
+ }
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index f7fe23c..0769558 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -23,8 +23,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
@@ -39,9 +42,11 @@ import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.examples.MasterSlaveStateModelFactory;
import org.apache.helix.model.CloudConfig;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -51,6 +56,7 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -117,7 +123,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
try {
tool.addInstance(clusterName, config);
- Assert.fail("should fail if add an alredy-existing instance");
+ Assert.fail("should fail if add an already-existing instance");
} catch (HelixException e) {
// OK
}
@@ -574,4 +580,72 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
Assert.assertNull(cloudConfigFromZk);
}
+
+ @Test
+ public void testGetDomainInformation() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ ClusterConfig clusterConfig = new ClusterConfig(clusterName);
+ clusterConfig.setTopologyAwareEnabled(true);
+ clusterConfig.setTopology("/group/zone/rack/host");
+ clusterConfig.setFaultZoneType("rack");
+
+ ConfigAccessor _configAccessor = new ConfigAccessor(_gZkClient);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ for (int i = 0; i < 42; i++) {
+
+ String hostname = "myhost" + i;
+ String port = "9999";
+ String instanceName = hostname + "_" + port;
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ instanceConfig.setHostName(hostname);
+ instanceConfig.setPort(port);
+ if (i == 40) {
+ instanceConfig.setDomain(String
+ .format("invaliddomain=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, "myzone" + i % 4,
+ "myrack" + i % 4, hostname));
+ } else if (i == 41) {
+ instanceConfig.setDomain("invaliddomain");
+ } else {
+ String domain = String
+ .format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, "myzone" + i % 4,
+ "myrack" + i % 4, hostname);
+ instanceConfig.setDomain(domain);
+ }
+ LiveInstance liveInstance = new LiveInstance(instanceName);
+ liveInstance.setSessionId(UUID.randomUUID().toString());
+ liveInstance.setHelixVersion(UUID.randomUUID().toString());
+ accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance);
+ admin.addInstance(clusterName, instanceConfig);
+ admin.enableInstance(clusterName, instanceName, true);
+ }
+
+ ClusterTopology clusterTopology = admin.getClusterTopology(clusterName);
+ Assert.assertNotNull(clusterTopology);
+ Map<String, List<String>> results = clusterTopology.getTopologyMap();
+ Assert.assertEquals(results.size(), 2);
+ Assert.assertTrue(results.containsKey("/group:mygroup0"));
+ Assert.assertTrue(results.containsKey("/group:mygroup1"));
+ Assert.assertEquals(results.get("/group:mygroup0").size(), 20);
+ Assert.assertEquals(results.get("/group:mygroup1").size(), 20);
+
+ results = clusterTopology.getFaultZoneMap();
+ Assert.assertEquals(results.size(), 4);
+ Assert.assertEquals(results.get("/group:mygroup0/zone:myzone0/rack:myrack0").size(), 10);
+ Assert.assertTrue(results.get("/group:mygroup0/zone:myzone0/rack:myrack0").contains("/host"
+ + ":myhost0"));
+
+ Assert.assertEquals(clusterTopology.getInvalidInstances().size(), 2);
+ Assert.assertTrue(clusterTopology.getInvalidInstances()
+ .containsAll(new HashSet<>(Arrays.asList("myhost40_9999", "myhost41_9999"))));
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index e06c902..19f0875 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
@@ -323,6 +324,11 @@ public class MockHelixAdmin implements HelixAdmin {
}
+ @Override
+ public ClusterTopology getClusterTopology(String clusterName) {
+ return null;
+ }
+
@Override public List<String> getStateModelDefs(String clusterName) {
return null;
}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterTrie.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterTrie.java
new file mode 100644
index 0000000..29385af
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterTrie.java
@@ -0,0 +1,141 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestClusterTrie {
+ private ClusterTrie _trie;
+
+ final List<String> _instanceNames = new ArrayList<>();
+ final Map<String, InstanceConfig> _instanceConfigMap = new HashMap<>();
+ private ClusterConfig _clusterConfig;
+ final int _numOfNodes = 40;
+
+ @BeforeClass
+ public void beforeClass() {
+ for (int i = 0; i < _numOfNodes; i++) {
+ _instanceNames.add(String.valueOf(i));
+ }
+ createClusterConfig();
+ createInstanceConfigMap();
+ }
+
+ @Test
+ public void testConstructionMissingInstanceConfigMap() {
+ Map<String, InstanceConfig> emptyMap = new HashMap<>();
+ try {
+ new ClusterTrie(_instanceNames, emptyMap, _clusterConfig);
+ Assert.fail("Expecting instance config not found exception");
+ } catch (HelixException e) {
+ Assert.assertTrue(e.getMessage().contains("is not found!"));
+ }
+ }
+
+ @Test
+ public void testConstructionMissingTopology() {
+ _clusterConfig.setTopology(null);
+ try {
+ new ClusterTrie(_instanceNames, _instanceConfigMap, _clusterConfig);
+ Assert.fail("Expecting topology not set exception");
+ } catch (HelixException e) {
+ Assert.assertTrue(e.getMessage().contains("is invalid!"));
+ }
+ _clusterConfig.setTopology("/group/zone/rack/host");
+ }
+
+ @Test
+ public void testConstructionInvalidTopology() {
+ _clusterConfig.setTopology("invalidTopology");
+ try {
+ new ClusterTrie(_instanceNames, _instanceConfigMap, _clusterConfig);
+ Assert.fail("Expecting topology invalid exception");
+ } catch (HelixException e) {
+ Assert.assertTrue(e.getMessage().contains("is invalid!"));
+ }
+ _clusterConfig.setTopology("/group/zone/rack/host");
+ }
+
+ @Test
+ public void testConstructionNormal() {
+ try {
+ _trie = new ClusterTrie(_instanceNames, _instanceConfigMap, _clusterConfig);
+ } catch (HelixException e) {
+ Assert.fail("Not expecting HelixException");
+ }
+ }
+
+ @Test
+ public void testConstructionNormalWithSpace() {
+ _clusterConfig.setTopology("/ group/ zone/rack/host");
+ try {
+ _trie = new ClusterTrie(_instanceNames, _instanceConfigMap, _clusterConfig);
+ } catch (HelixException e) {
+ Assert.fail("Not expecting HelixException");
+ }
+ String[] topologyDef = _trie.getTopologyKeys();
+ Assert.assertEquals(topologyDef[0], "group");
+ Assert.assertEquals(topologyDef[1], "zone");
+ _clusterConfig.setTopology("/group/zone/rack/host");
+ }
+
+ @Test
+ public void testConstructionNormalWithInvalidConfig() {
+ String instance = "invalidInstance";
+ InstanceConfig config = new InstanceConfig(instance);
+ config.setDomain(String.format("invaliddomain=%s, zone=%s, rack=%s, host=%s", 1, 2, 3, 4));
+ _instanceConfigMap.put(instance, config);
+ try {
+ _trie = new ClusterTrie(_instanceNames, _instanceConfigMap, _clusterConfig);
+ } catch (HelixException e) {
+ Assert.fail("Not expecting HelixException");
+ }
+ Assert.assertEquals(_trie.getInvalidInstances().size(), 1);
+ Assert.assertEquals(_trie.getInvalidInstances().get(0), instance );
+ _instanceConfigMap.remove(instance);
+ }
+
+ private void createInstanceConfigMap() {
+ for (int i = 0; i < _instanceNames.size(); i++) {
+ String instance = _instanceNames.get(i);
+ InstanceConfig config = new InstanceConfig(instance);
+ // create 2 groups, 4 zones, and 4 racks.
+ config.setDomain(String.format("group=%s, zone=%s, rack=%s, host=%s", i % (_numOfNodes / 10),
+ i % (_numOfNodes / 5), i % (_numOfNodes / 5), instance));
+ _instanceConfigMap.put(instance, config);
+ }
+ }
+
+ private void createClusterConfig() {
+ _clusterConfig = new ClusterConfig("test");
+ _clusterConfig.setTopologyAwareEnabled(true);
+ _clusterConfig.setTopology("/group/zone/rack/host");
+ _clusterConfig.setFaultZoneType("rack");
+ }
+}
\ No newline at end of file