You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:13 UTC
[helix] 35/50: Adjust the topology processing logic for instance to
ensure backward compatibility.
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit ad202c6e393403e079508e224e99b9ab8a9d226d
Author: jiajunwang <jj...@linkedin.com>
AuthorDate: Tue Oct 1 13:49:33 2019 -0700
Adjust the topology processing logic for instance to ensure backward compatibility.
---
.../rebalancer/waged/model/AssignableNode.java | 52 +++++++++-------------
.../rebalancer/waged/model/TestAssignableNode.java | 15 ++++---
2 files changed, 28 insertions(+), 39 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 2a68e15..3bfd225 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -28,16 +28,14 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
@@ -138,8 +136,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
}
Map<String, AssignableReplica> partitionMap = _currentAssignedReplicaMap.get(resourceName);
- if (!partitionMap.containsKey(partitionName)
- || !partitionMap.get(partitionName).equals(replica)) {
+ if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName)
+ .equals(replica)) {
LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.",
replica.toString(), getInstanceName());
return;
@@ -269,10 +267,9 @@ public class AssignableNode implements Comparable<AssignableNode> {
* For example, when
* the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function
* returns "2".
- * If cannot find the fault zone id, this function leaves the fault zone id as the instance name.
- * TODO merge this logic with Topology.java tree building logic.
- * For now, the WAGED rebalancer has a more strict topology def requirement.
- * Any missing field will cause an invalid topology config exception.
+ * If cannot find the fault zone type, this function leaves the fault zone id as the instance name.
+ * Note the WAGED rebalancer does not require full topology tree to be created. So this logic is
+ * simpler than the CRUSH based rebalancer.
*/
private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
if (!clusterConfig.isTopologyAwareEnabled()) {
@@ -290,36 +287,27 @@ public class AssignableNode implements Comparable<AssignableNode> {
return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
} else {
// Get the fault zone information from the complete topology definition.
- String[] topologyDef = topologyStr.trim().split("/");
- if (topologyDef.length == 0
- || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
+ String[] topologyKeys = topologyStr.trim().split("/");
+ if (topologyKeys.length == 0 || Arrays.stream(topologyKeys)
+ .noneMatch(type -> type.equals(faultZoneType))) {
throw new HelixException(
"The configured topology definition is empty or does not contain the fault zone type.");
}
Map<String, String> domainAsMap = instanceConfig.getDomainAsMap();
- if (domainAsMap == null) {
- throw new HelixException(
- String.format("The domain configuration of node %s is not configured", _instanceName));
- } else {
- StringBuilder faultZoneStringBuilder = new StringBuilder();
- for (String key : topologyDef) {
- if (!key.isEmpty()) {
- if (domainAsMap.containsKey(key)) {
- faultZoneStringBuilder.append(domainAsMap.get(key));
- faultZoneStringBuilder.append('/');
- } else {
- throw new HelixException(String.format(
- "The domain configuration of node %s is not complete. Type %s is not found.",
- _instanceName, key));
- }
- if (key.equals(faultZoneType)) {
- break;
- }
+ StringBuilder faultZoneStringBuilder = new StringBuilder();
+ for (String key : topologyKeys) {
+ if (!key.isEmpty()) {
+ // if a key does not exist in the instance domain config, apply the default domain value.
+ faultZoneStringBuilder.append(domainAsMap.getOrDefault(key, "Default_" + key));
+ if (key.equals(faultZoneType)) {
+ break;
+ } else {
+ faultZoneStringBuilder.append('/');
}
}
- return faultZoneStringBuilder.toString();
}
+ return faultZoneStringBuilder.toString();
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index b48587f..e8b010e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -189,24 +189,25 @@ public class TestAssignableNode extends AbstractTestClusterModel {
assignableNode.assign(duplicateReplica);
}
- @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The domain configuration of node testInstanceId is not complete. Type DOES_NOT_EXIST is not found.")
+ @Test
public void testParseFaultZoneNotFound() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
- testClusterConfig.setFaultZoneType("DOES_NOT_EXIST");
+ testClusterConfig.setFaultZoneType("zone");
testClusterConfig.setTopologyAwareEnabled(true);
- testClusterConfig.setTopology("/DOES_NOT_EXIST/");
+ testClusterConfig.setTopology("/zone/");
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
- testInstanceConfig.setDomain("zone=2, instance=testInstance");
+ testInstanceConfig.setDomain("instance=testInstance");
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put(_testInstanceId, testInstanceConfig);
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
- new AssignableNode(testCache.getClusterConfig(),
+ AssignableNode node = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ Assert.assertEquals(node.getFaultZone(), "Default_zone");
}
@Test
@@ -228,7 +229,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
- Assert.assertEquals(assignableNode.getFaultZone(), "2/");
+ Assert.assertEquals(assignableNode.getFaultZone(), "2");
testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig.setFaultZoneType("instance");
@@ -245,7 +246,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
- Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
+ Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
}
@Test