You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:07 UTC
[helix] 29/50: PartitionMovementSoftConstraint Implementation (#474)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 37b07318dd10a1a78ddf26f3401978b8778c00fc
Author: Yi Wang <yw...@linkedin.com>
AuthorDate: Tue Sep 17 15:09:45 2019 -0700
PartitionMovementSoftConstraint Implementation (#474)
Add soft constraint: partition movement constraint
Evaluate the proposed assignment according to the potential partition movements cost.
The cost is evaluated based on the difference between the old assignment and the new assignment.
---
.../constraints/PartitionMovementConstraint.java | 96 ++++++++++++++++
.../rebalancer/waged/model/ClusterContext.java | 43 +++++--
.../rebalancer/waged/model/ClusterModel.java | 27 +----
.../waged/model/ClusterModelProvider.java | 5 +-
.../TestPartitionMovementConstraint.java | 127 +++++++++++++++++++++
.../waged/model/ClusterModelTestHelper.java | 6 +-
.../rebalancer/waged/model/TestClusterContext.java | 17 +--
.../rebalancer/waged/model/TestClusterModel.java | 15 ++-
8 files changed, 278 insertions(+), 58 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
new file mode 100644
index 0000000..a781afc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
@@ -0,0 +1,96 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+/**
+ * Evaluate the proposed assignment according to the potential partition movements cost.
+ * The cost is evaluated based on the difference between the old assignment and the new assignment.
+ * In detail, we consider the following two previous assignments as the base.
+ * - Baseline assignment that is calculated regardless of the node state (online/offline).
+ * - Previous Best Possible assignment.
+ * Any change to these two assignments will increase the partition movements cost, so that the
+ * evaluated score will become lower.
+ */
+class PartitionMovementConstraint extends SoftConstraint {
+ private static final float MAX_SCORE = 1f;
+ private static final float MIN_SCORE = 0f;
+ //TODO: these factors will be tuned based on user's preference
+ // This factor indicates the default score that is evaluated if only partition allocation matches
+ // (states are different).
+ private static final float ALLOCATION_MATCH_FACTOR = 0.5f;
+ // This factor indicates the contribution of the Baseline assignment matching to the final score.
+ private static final float BASELINE_MATCH_FACTOR = 0.25f;
+
+ PartitionMovementConstraint() {
+ super(MAX_SCORE, MIN_SCORE);
+ }
+
+ @Override
+ protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext) {
+ Map<String, String> bestPossibleStateMap =
+ getStateMap(replica, clusterContext.getBestPossibleAssignment());
+ Map<String, String> baselineStateMap =
+ getStateMap(replica, clusterContext.getBaselineAssignment());
+
+ // Prioritize the matching of the previous Best Possible assignment.
+ float scale = calculateAssignmentScale(node, replica, bestPossibleStateMap);
+ // If the baseline is also provided, adjust the final score accordingly.
+ scale = scale * (1 - BASELINE_MATCH_FACTOR)
+ + calculateAssignmentScale(node, replica, baselineStateMap) * BASELINE_MATCH_FACTOR;
+
+ return scale;
+ }
+
+ @Override
+ NormalizeFunction getNormalizeFunction() {
+ return score -> score * (getMaxScore() - getMinScore()) + getMinScore();
+ }
+
+ private Map<String, String> getStateMap(AssignableReplica replica,
+ Map<String, ResourceAssignment> assignment) {
+ String resourceName = replica.getResourceName();
+ String partitionName = replica.getPartitionName();
+ if (assignment == null || !assignment.containsKey(resourceName)) {
+ return Collections.emptyMap();
+ }
+ return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
+ }
+
+ private float calculateAssignmentScale(AssignableNode node, AssignableReplica replica,
+ Map<String, String> instanceToStateMap) {
+ String instanceName = node.getInstanceName();
+ if (!instanceToStateMap.containsKey(instanceName)) {
+ return 0;
+ } else {
+ return (instanceToStateMap.get(instanceName).equals(replica.getReplicaState()) ? 1
+ : ALLOCATION_MATCH_FACTOR);
+ }
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index a0c841a..892cad3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -28,6 +28,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.HelixException;
+import org.apache.helix.model.ResourceAssignment;
+
/**
* This class tracks the rebalance-related global cluster status.
@@ -44,30 +46,47 @@ public class ClusterContext {
// map{zoneName : map{resourceName : set(partitionNames)}}
private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
+ // Records about the previous assignment
+ // <ResourceName, ResourceAssignment contains the baseline assignment>
+ private final Map<String, ResourceAssignment> _baselineAssignment;
+ // <ResourceName, ResourceAssignment contains the best possible assignment>
+ private final Map<String, ResourceAssignment> _bestPossibleAssignment;
/**
* Construct the cluster context based on the current instance status.
* @param replicaSet All the partition replicas that are managed by the rebalancer
* @param instanceCount The count of all the active instances that can be used to host partitions.
*/
- ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
+ ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount,
+ Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
int totalReplicas = 0;
int totalTopStateReplicas = 0;
for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
- .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) {
+ .collect(Collectors.groupingBy(AssignableReplica::getResourceName))
+ .entrySet()) {
int replicas = entry.getValue().size();
totalReplicas += replicas;
int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount));
_estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
- totalTopStateReplicas +=
- entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+ totalTopStateReplicas += entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
}
_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+ _baselineAssignment = baselineAssignment;
+ _bestPossibleAssignment = bestPossibleAssignment;
+ }
+
+ public Map<String, ResourceAssignment> getBaselineAssignment() {
+ return _baselineAssignment == null || _baselineAssignment.isEmpty() ? Collections.emptyMap() : _baselineAssignment;
+ }
+
+ public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+ return _bestPossibleAssignment == null || _bestPossibleAssignment.isEmpty() ? Collections.emptyMap()
+ : _bestPossibleAssignment;
}
public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() {
@@ -93,25 +112,25 @@ public class ClusterContext {
void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
- .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
+ .computeIfAbsent(resourceName, k -> new HashSet<>())
+ .add(partition)) {
throw new HelixException(
- String.format("Resource %s already has a replica from partition %s in fault zone %s",
- resourceName, partition, faultZoneId));
+ String.format("Resource %s already has a replica from partition %s in fault zone %s", resourceName, partition,
+ faultZoneId));
}
}
boolean removePartitionFromFaultZone(String faultZoneId, String resourceName, String partition) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
- .getOrDefault(resourceName, Collections.emptySet()).remove(partition);
+ .getOrDefault(resourceName, Collections.emptySet())
+ .remove(partition);
}
- void setAssignmentForFaultZoneMap(
- Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
+ void setAssignmentForFaultZoneMap(Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
_assignmentForFaultZoneMap = assignmentForFaultZoneMap;
}
private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
- return (int) Math
- .ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
+ return (int) Math.ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 6c4e67b..3d31c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -19,14 +19,13 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixException;
-import org.apache.helix.model.ResourceAssignment;
-
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.helix.HelixException;
+
/**
* This class wraps the required input for the rebalance algorithm.
*/
@@ -39,23 +38,14 @@ public class ClusterModel {
private final Map<String, Map<String, AssignableReplica>> _assignableReplicaIndex;
private final Map<String, AssignableNode> _assignableNodeMap;
- // Records about the previous assignment
- // <ResourceName, ResourceAssignment contains the baseline assignment>
- private final Map<String, ResourceAssignment> _baselineAssignment;
- // <ResourceName, ResourceAssignment contains the best possible assignment>
- private final Map<String, ResourceAssignment> _bestPossibleAssignment;
-
/**
* @param clusterContext The initialized cluster context.
* @param assignableReplicas The replicas to be assigned.
* Note that the replicas in this list shall not be included while initializing the context and assignable nodes.
* @param assignableNodes The active instances.
- * @param baselineAssignment The recorded baseline assignment.
- * @param bestPossibleAssignment The current best possible assignment.
*/
ClusterModel(ClusterContext clusterContext, Set<AssignableReplica> assignableReplicas,
- Set<AssignableNode> assignableNodes, Map<String, ResourceAssignment> baselineAssignment,
- Map<String, ResourceAssignment> bestPossibleAssignment) {
+ Set<AssignableNode> assignableNodes) {
_clusterContext = clusterContext;
// Save all the to be assigned replication
@@ -70,9 +60,6 @@ public class ClusterModel {
_assignableNodeMap = assignableNodes.stream()
.collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node));
-
- _baselineAssignment = baselineAssignment;
- _bestPossibleAssignment = bestPossibleAssignment;
}
public ClusterContext getContext() {
@@ -87,14 +74,6 @@ public class ClusterModel {
return _assignableReplicaMap;
}
- public Map<String, ResourceAssignment> getBaseline() {
- return _baselineAssignment;
- }
-
- public Map<String, ResourceAssignment> getBestPossibleAssignment() {
- return _bestPossibleAssignment;
- }
-
/**
* Assign the given replica to the specified instance and record the assignment in the cluster model.
* The cluster usage information will be updated accordingly.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 20024c7..af1a8d8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -86,12 +86,11 @@ public class ClusterModelProvider {
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
- activeInstances.size());
+ activeInstances.size(), baselineAssignment, bestPossibleAssignment);
// Initial the cluster context with the allocated assignments.
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
- return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, baselineAssignment,
- bestPossibleAssignment);
+ return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
new file mode 100644
index 0000000..d3af35e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
@@ -0,0 +1,127 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestPartitionMovementConstraint {
+ private static final String INSTANCE = "TestInstance";
+ private static final String RESOURCE = "TestResource";
+ private static final String PARTITION = "TestPartition";
+ private AssignableNode _testNode;
+ private AssignableReplica _testReplica;
+ private ClusterContext _clusterContext;
+ private SoftConstraint _constraint = new PartitionMovementConstraint();
+
+ @BeforeMethod
+ public void init() {
+ _testNode = mock(AssignableNode.class);
+ _testReplica = mock(AssignableReplica.class);
+ _clusterContext = mock(ClusterContext.class);
+ when(_testReplica.getResourceName()).thenReturn(RESOURCE);
+ when(_testReplica.getPartitionName()).thenReturn(PARTITION);
+ when(_testNode.getInstanceName()).thenReturn(INSTANCE);
+ }
+
+ @Test
+ public void testGetAssignmentScoreWhenBestPossibleBaselineMissing() {
+ when(_clusterContext.getBaselineAssignment()).thenReturn(Collections.emptyMap());
+ when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
+ float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+ float normalizedScore =
+ _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+ Assert.assertEquals(score, 0f);
+ Assert.assertEquals(normalizedScore, 0f);
+ }
+
+ @Test
+ public void testGetAssignmentScoreWhenBestPossibleBaselineSame() {
+ ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class);
+ when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION)))
+ .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
+ Map<String, ResourceAssignment> assignmentMap =
+ ImmutableMap.of(RESOURCE, mockResourceAssignment);
+ when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap);
+ when(_clusterContext.getBestPossibleAssignment()).thenReturn(assignmentMap);
+ // when the calculated states are both equal to the replica's current state
+ when(_testReplica.getReplicaState()).thenReturn("Master");
+ float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+ float normalizedScore =
+ _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+ Assert.assertEquals(score, 1f);
+ Assert.assertEquals(normalizedScore, 1f);
+ // when the calculated states are both different from the replica's current state
+ when(_testReplica.getReplicaState()).thenReturn("Slave");
+ score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+ normalizedScore =
+ _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+ Assert.assertEquals(score, 0.5f);
+ Assert.assertEquals(normalizedScore, 0.5f);
+ }
+
+ @Test
+ public void testGetAssignmentScoreWhenBestPossibleBaselineOpposite() {
+ ResourceAssignment bestPossibleResourceAssignment = mock(ResourceAssignment.class);
+ when(bestPossibleResourceAssignment.getReplicaMap(new Partition(PARTITION)))
+ .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
+ ResourceAssignment baselineResourceAssignment = mock(ResourceAssignment.class);
+ when(baselineResourceAssignment.getReplicaMap(new Partition(PARTITION)))
+ .thenReturn(ImmutableMap.of(INSTANCE, "Slave"));
+ when(_clusterContext.getBaselineAssignment())
+ .thenReturn(ImmutableMap.of(RESOURCE, baselineResourceAssignment));
+ when(_clusterContext.getBestPossibleAssignment())
+ .thenReturn(ImmutableMap.of(RESOURCE, bestPossibleResourceAssignment));
+ // when the replica's state matches with best possible only
+ when(_testReplica.getReplicaState()).thenReturn("Master");
+ float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+ float normalizedScore =
+ _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+ Assert.assertEquals(score, 0.875f);
+ Assert.assertEquals(normalizedScore, 0.875f);
+ // when the replica's state matches with baseline only
+ when(_testReplica.getReplicaState()).thenReturn("Slave");
+ score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+ normalizedScore =
+ _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+ // The calculated score is lower than previous 0.875f cause the replica's state matches with
+ // best possible is preferred
+ Assert.assertEquals(score, 0.625f);
+ Assert.assertEquals(normalizedScore, 0.625f);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 08143c6..585c26f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -34,9 +34,9 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
Set<AssignableNode> assignableNodes = generateNodes(testCache);
- ClusterContext context = new ClusterContext(assignableReplicas, 2);
- return new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(),
- Collections.emptyMap());
+ ClusterContext context =
+ new ClusterContext(assignableReplicas, 2, Collections.emptyMap(), Collections.emptyMap());
+ return new ClusterModel(context, assignableReplicas, assignableNodes);
}
private Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 8206f29..d8b93c0 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -19,18 +19,19 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixException;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
public class TestClusterContext extends AbstractTestClusterModel {
@BeforeClass
public void initialize() {
@@ -43,7 +44,7 @@ public class TestClusterContext extends AbstractTestClusterModel {
ResourceControllerDataProvider testCache = setupClusterDataCache();
Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
- ClusterContext context = new ClusterContext(assignmentSet, 2);
+ ClusterContext context = new ClusterContext(assignmentSet, 2, new HashMap<>(), new HashMap<>());
// Note that we left some margin for the max estimation.
Assert.assertEquals(context.getEstimatedMaxPartitionCount(), 3);
@@ -80,7 +81,7 @@ public class TestClusterContext extends AbstractTestClusterModel {
public void testDuplicateAssign() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();
Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
- ClusterContext context = new ClusterContext(assignmentSet, 2);
+ ClusterContext context = new ClusterContext(assignmentSet, 2, new HashMap<>(), new HashMap<>());
context
.addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0));
// Insert again and trigger the error.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index 5112413..12146b7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -19,17 +19,17 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.helix.HelixException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
public class TestClusterModel extends AbstractTestClusterModel {
@BeforeClass
public void initialize() {
@@ -54,10 +54,9 @@ public class TestClusterModel extends AbstractTestClusterModel {
Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
Set<AssignableNode> assignableNodes = generateNodes(testCache);
- ClusterContext context = new ClusterContext(assignableReplicas, 2);
+ ClusterContext context = new ClusterContext(assignableReplicas, 2, Collections.emptyMap(), Collections.emptyMap());
ClusterModel clusterModel =
- new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(),
- Collections.emptyMap());
+ new ClusterModel(context, assignableReplicas, assignableNodes);
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
.allMatch(resourceMap -> resourceMap.values().isEmpty()));