You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:31 UTC
[helix] 28/37: Adjust the expected replica count according to fault
zone count. (#476)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 679547fb511754cce2863a51e10ffcc6d8ab98b0
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Sep 17 13:41:56 2019 -0700
Adjust the expected replica count according to fault zone count. (#476)
The rebalancer should determine the expected replica count according to the fault zone instead of the node count only.
---
.../rebalancer/waged/model/AssignableNode.java | 56 ++++++++++------------
.../waged/model/ClusterModelProvider.java | 28 ++++++-----
.../waged/model/ClusterModelTestHelper.java | 3 +-
.../rebalancer/waged/model/TestAssignableNode.java | 24 ++++------
.../rebalancer/waged/model/TestClusterModel.java | 3 +-
.../waged/model/TestClusterModelProvider.java | 33 ++++++++-----
6 files changed, 76 insertions(+), 71 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 6966353..a3460fb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -64,12 +64,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
* @param clusterConfig
* @param instanceConfig
* @param instanceName
- * @param existingAssignment A collection of replicas that have been pre-allocated to the node.
*/
- AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
- Collection<AssignableReplica> existingAssignment) {
+ AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
_instanceName = instanceName;
- refresh(clusterConfig, instanceConfig, existingAssignment);
+ refresh(clusterConfig, instanceConfig);
}
private void reset() {
@@ -88,10 +86,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
* subject to change. If the assumption is no longer true, this function should become private.
* @param clusterConfig - the Cluster Config of the cluster where the node is located
* @param instanceConfig - the Instance Config of the node
- * @param existingAssignment - all the existing replicas that are current assigned to the node
*/
- private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
- Collection<AssignableReplica> existingAssignment) {
+ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
reset();
Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
@@ -101,8 +97,29 @@ public class AssignableNode implements Comparable<AssignableNode> {
_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
_maxCapacity = instanceCapacity;
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+ }
+
+ /**
+ * This function should only be used to assign a set of new partitions that are not allocated on
+ * this node.
+ * Using this function avoids the overhead of updating capacity repeatedly.
+ */
+ void assignNewBatch(Collection<AssignableReplica> replicas) {
+ Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+ for (AssignableReplica replica : replicas) {
+ addToAssignmentRecord(replica);
+ // increment the capacity requirement according to partition's capacity configuration.
+ for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+ totalPartitionCapacity.compute(capacity.getKey(),
+ (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+ : totalValue + capacity.getValue());
+ }
+ }
- assignNewBatch(existingAssignment);
+ // Update the global state after all single replications' calculation is done.
+ for (String key : totalPartitionCapacity.keySet()) {
+ updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+ }
}
/**
@@ -315,29 +332,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
}
/**
- * This function should only be used to assign a set of new partitions that are not allocated on
- * this node.
- * Using this function avoids the overhead of updating capacity repeatedly.
- */
- private void assignNewBatch(Collection<AssignableReplica> replicas) {
- Map<String, Integer> totalPartitionCapacity = new HashMap<>();
- for (AssignableReplica replica : replicas) {
- addToAssignmentRecord(replica);
- // increment the capacity requirement according to partition's capacity configuration.
- for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
- totalPartitionCapacity.compute(capacity.getKey(),
- (key, totalValue) -> (totalValue == null) ? capacity.getValue()
- : totalValue + capacity.getValue());
- }
- }
-
- // Update the global state after all single replications' calculation is done.
- for (String key : totalPartitionCapacity.keySet()) {
- updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
- }
- }
-
- /**
* @throws HelixException if the replica has already been assigned to the node.
*/
private void addToAssignmentRecord(AssignableReplica replica) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 3570164..20024c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -62,10 +62,15 @@ public class ClusterModelProvider {
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // Construct all the assignable nodes and initialize with the allocated replicas.
+ Set<AssignableNode> assignableNodes =
+ parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
+ activeInstances);
+
// Generate replica objects for all the resource partitions.
// <resource, replica set>
Map<String, Set<AssignableReplica>> replicaMap =
- parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
+ parseAllReplicas(dataProvider, resourceMap, assignableNodes);
// Check if the replicas need to be reassigned.
Map<String, Set<AssignableReplica>> allocatedReplicas =
@@ -74,10 +79,9 @@ public class ClusterModelProvider {
findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
bestPossibleAssignment, allocatedReplicas);
- // Construct all the assignable nodes and initialize with the allocated replicas.
- Set<AssignableNode> assignableNodes =
- parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
- activeInstances, allocatedReplicas);
+ // Update the allocated replicas to the assignable nodes.
+ assignableNodes.stream().forEach(node -> node.assignNewBatch(
+ allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
@@ -171,15 +175,13 @@ public class ClusterModelProvider {
* @param clusterConfig The cluster configuration.
* @param instanceConfigMap A map of all the instance configuration.
* @param activeInstances All the instances that are online and enabled.
- * @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned during the rebalance.
* @return A map of assignable node set, <InstanceName, node set>.
*/
private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
- Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances,
- Map<String, Set<AssignableReplica>> allocatedReplicas) {
+ Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) {
return activeInstances.stream().map(
instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
- instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet())))
+ instanceName))
.collect(Collectors.toSet());
}
@@ -188,11 +190,12 @@ public class ClusterModelProvider {
*
* @param dataProvider The cluster status cache that contains the current cluster status.
* @param resourceMap All the valid resources that are managed by the rebalancer.
+ * @param assignableNodes All the active assignable nodes.
* @return A map of assignable replica set, <ResourceName, replica set>.
*/
private static Map<String, Set<AssignableReplica>> parseAllReplicas(
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
- int instanceCount) {
+ Set<AssignableNode> assignableNodes) {
Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
ClusterConfig clusterConfig = dataProvider.getClusterConfig();
@@ -211,8 +214,11 @@ public class ClusterModelProvider {
is.getStateModelDefRef(), resourceName));
}
+ int activeFaultZoneCount =
+ assignableNodes.stream().map(node -> node.getFaultZone()).collect(Collectors.toSet())
+ .size();
Map<String, Integer> stateCountMap =
- def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount));
+ def.getStateCountMap(activeFaultZoneCount, is.getReplicaCount(assignableNodes.size()));
for (String partition : is.getPartitionSet()) {
for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 76f1141..08143c6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -43,8 +43,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
Set<AssignableNode> nodeSet = new HashSet<>();
testCache.getInstanceConfigMap().values().stream()
.forEach(config -> nodeSet.add(new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
- Collections.emptyList())));
+ testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName())));
return nodeSet;
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 92a6998..6975901 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -64,7 +64,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
expectedCapacityMap.put("item3", 30);
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ assignableNode.assignNewBatch(assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
@@ -167,8 +168,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
ResourceControllerDataProvider testCache = setupClusterDataCache();
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
- Collections.emptyList());
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
"MASTER", 1);
@@ -183,7 +183,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ assignableNode.assignNewBatch(assignmentSet);
AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
assignableNode.assign(duplicateReplica);
@@ -206,8 +207,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
- Collections.emptyList());
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
}
@Test
@@ -227,8 +227,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
- Collections.emptyList());
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "2/");
@@ -245,8 +244,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
assignableNode = new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
- Collections.emptyList());
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
}
@@ -259,8 +257,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
AssignableNode assignableNode =
- new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
- Collections.emptyList());
+ new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
}
@@ -274,7 +271,6 @@ public class TestAssignableNode extends AbstractTestClusterModel {
InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
- new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
- Collections.emptyList());
+ new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index a45b729..5112413 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -43,8 +43,7 @@ public class TestClusterModel extends AbstractTestClusterModel {
Set<AssignableNode> nodeSet = new HashSet<>();
testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add(
new AssignableNode(testCache.getClusterConfig(),
- testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
- Collections.emptyList())));
+ testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName())));
return nodeSet;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 1ec92a9..ad608b6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.helix.HelixConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -34,14 +42,6 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
@@ -111,7 +111,18 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
Assert.assertEquals(
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
.collect(Collectors.toSet()), _instances);
- // Shall have 2 resources and 12 replicas
+ // Shall have 2 resources and 4 replicas, since all nodes are in the same fault zone.
+ Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+ Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+ .allMatch(replicaSet -> replicaSet.size() == 4));
+
+ // Adjust instance fault zone, so they have different fault zones.
+ testCache.getInstanceConfigMap().values().stream()
+ .forEach(config -> config.setZoneId(config.getInstanceName()));
+ clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+ .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+ _instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+ // Shall have 2 resources and 12 replicas after fault zone adjusted.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 12));
@@ -197,10 +208,10 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
_instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
Collections.singleton(changedResourceName)), Collections.emptyMap(),
bestPossibleAssignment);
- // There should be no existing assignment for all the resource except for resource2.
+ // There should be no existing assignment for all the resource except for resource2
Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(), 1);
Map<String, Set<String>> resourceAssignmentMap =
- clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId);
+ clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testInstanceId);
// Should be only resource2 in the map
Assert.assertEquals(resourceAssignmentMap.size(), 1);
for (String resource : _resourceNames) {