You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:32 UTC

[helix] 29/37: PartitionMovementSoftConstraint Implementation (#474)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 50e8ceccd35dfa791b066ff85e8e6a78555511b8
Author: Yi Wang <yw...@linkedin.com>
AuthorDate: Tue Sep 17 15:09:45 2019 -0700

    PartitionMovementSoftConstraint Implementation (#474)
    
    Add soft constraint: partition movement constraint
    
    Evaluate the proposed assignment according to the potential partition movements cost.
    The cost is evaluated based on the difference between the old assignment and the new assignment.
---
 .../constraints/PartitionMovementConstraint.java   |  96 ++++++++++++++++
 .../rebalancer/waged/model/ClusterContext.java     |  43 +++++--
 .../rebalancer/waged/model/ClusterModel.java       |  27 +----
 .../waged/model/ClusterModelProvider.java          |   5 +-
 .../TestPartitionMovementConstraint.java           | 127 +++++++++++++++++++++
 .../waged/model/ClusterModelTestHelper.java        |   6 +-
 .../rebalancer/waged/model/TestClusterContext.java |  17 +--
 .../rebalancer/waged/model/TestClusterModel.java   |  15 ++-
 8 files changed, 278 insertions(+), 58 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
new file mode 100644
index 0000000..a781afc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
@@ -0,0 +1,96 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+/**
+ * Evaluate the proposed assignment according to the potential partition movements cost.
+ * The cost is evaluated based on the difference between the old assignment and the new assignment.
+ * In detail, we consider the following two previous assignments as the base.
+ * - Baseline assignment that is calculated regardless of the node state (online/offline).
+ * - Previous Best Possible assignment.
+ * Any change to these two assignments will increase the partition movements cost, so that the
+ * evaluated score will become lower.
+ */
+class PartitionMovementConstraint extends SoftConstraint {
+  private static final float MAX_SCORE = 1f;
+  private static final float MIN_SCORE = 0f;
+  //TODO: these factors will be tuned based on user's preference
+  // This factor indicates the default score that is evaluated if only partition allocation matches
+  // (states are different).
+  private static final float ALLOCATION_MATCH_FACTOR = 0.5f;
+  // This factor indicates the contribution of the Baseline assignment matching to the final score.
+  private static final float BASELINE_MATCH_FACTOR = 0.25f;
+
+  PartitionMovementConstraint() {
+    super(MAX_SCORE, MIN_SCORE);
+  }
+
+  @Override
+  protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    Map<String, String> bestPossibleStateMap =
+        getStateMap(replica, clusterContext.getBestPossibleAssignment());
+    Map<String, String> baselineStateMap =
+        getStateMap(replica, clusterContext.getBaselineAssignment());
+
+    // Prioritize the matching of the previous Best Possible assignment.
+    float scale = calculateAssignmentScale(node, replica, bestPossibleStateMap);
+    // If the baseline is also provided, adjust the final score accordingly.
+    scale = scale * (1 - BASELINE_MATCH_FACTOR)
+        + calculateAssignmentScale(node, replica, baselineStateMap) * BASELINE_MATCH_FACTOR;
+
+    return scale;
+  }
+
+  @Override
+  NormalizeFunction getNormalizeFunction() {
+    return score -> score * (getMaxScore() - getMinScore()) + getMinScore();
+  }
+
+  private Map<String, String> getStateMap(AssignableReplica replica,
+      Map<String, ResourceAssignment> assignment) {
+    String resourceName = replica.getResourceName();
+    String partitionName = replica.getPartitionName();
+    if (assignment == null || !assignment.containsKey(resourceName)) {
+      return Collections.emptyMap();
+    }
+    return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
+  }
+
+  private float calculateAssignmentScale(AssignableNode node, AssignableReplica replica,
+      Map<String, String> instanceToStateMap) {
+    String instanceName = node.getInstanceName();
+    if (!instanceToStateMap.containsKey(instanceName)) {
+      return 0;
+    } else {
+      return (instanceToStateMap.get(instanceName).equals(replica.getReplicaState()) ? 1
+          : ALLOCATION_MATCH_FACTOR);
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index a0c841a..892cad3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.model.ResourceAssignment;
+
 
 /**
  * This class tracks the rebalance-related global cluster status.
@@ -44,30 +46,47 @@ public class ClusterContext {
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
+  // Records about the previous assignment
+  // <ResourceName, ResourceAssignment contains the baseline assignment>
+  private final Map<String, ResourceAssignment> _baselineAssignment;
+  // <ResourceName, ResourceAssignment contains the best possible assignment>
+  private final Map<String, ResourceAssignment> _bestPossibleAssignment;
 
   /**
    * Construct the cluster context based on the current instance status.
    * @param replicaSet All the partition replicas that are managed by the rebalancer
    * @param instanceCount The count of all the active instances that can be used to host partitions.
    */
-  ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
+  ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount,
+      Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
-        .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) {
+        .collect(Collectors.groupingBy(AssignableReplica::getResourceName))
+        .entrySet()) {
       int replicas = entry.getValue().size();
       totalReplicas += replicas;
 
       int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount));
       _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
 
-      totalTopStateReplicas +=
-          entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+      totalTopStateReplicas += entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
     }
 
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
     _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+    _baselineAssignment = baselineAssignment;
+    _bestPossibleAssignment = bestPossibleAssignment;
+  }
+
+  public Map<String, ResourceAssignment> getBaselineAssignment() {
+    return _baselineAssignment == null || _baselineAssignment.isEmpty() ? Collections.emptyMap() : _baselineAssignment;
+  }
+
+  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+    return _bestPossibleAssignment == null || _bestPossibleAssignment.isEmpty() ? Collections.emptyMap()
+        : _bestPossibleAssignment;
   }
 
   public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() {
@@ -93,25 +112,25 @@ public class ClusterContext {
 
   void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
     if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
-        .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
+        .computeIfAbsent(resourceName, k -> new HashSet<>())
+        .add(partition)) {
       throw new HelixException(
-          String.format("Resource %s already has a replica from partition %s in fault zone %s",
-              resourceName, partition, faultZoneId));
+          String.format("Resource %s already has a replica from partition %s in fault zone %s", resourceName, partition,
+              faultZoneId));
     }
   }
 
   boolean removePartitionFromFaultZone(String faultZoneId, String resourceName, String partition) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
-        .getOrDefault(resourceName, Collections.emptySet()).remove(partition);
+        .getOrDefault(resourceName, Collections.emptySet())
+        .remove(partition);
   }
 
-  void setAssignmentForFaultZoneMap(
-      Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
+  void setAssignmentForFaultZoneMap(Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
     _assignmentForFaultZoneMap = assignmentForFaultZoneMap;
   }
 
   private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
-    return (int) Math
-        .ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
+    return (int) Math.ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 6c4e67b..3d31c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -19,14 +19,13 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixException;
-import org.apache.helix.model.ResourceAssignment;
-
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.HelixException;
+
 /**
  * This class wraps the required input for the rebalance algorithm.
  */
@@ -39,23 +38,14 @@ public class ClusterModel {
   private final Map<String, Map<String, AssignableReplica>> _assignableReplicaIndex;
   private final Map<String, AssignableNode> _assignableNodeMap;
 
-  // Records about the previous assignment
-  // <ResourceName, ResourceAssignment contains the baseline assignment>
-  private final Map<String, ResourceAssignment> _baselineAssignment;
-  // <ResourceName, ResourceAssignment contains the best possible assignment>
-  private final Map<String, ResourceAssignment> _bestPossibleAssignment;
-
   /**
    * @param clusterContext         The initialized cluster context.
    * @param assignableReplicas     The replicas to be assigned.
    *                               Note that the replicas in this list shall not be included while initializing the context and assignable nodes.
    * @param assignableNodes        The active instances.
-   * @param baselineAssignment     The recorded baseline assignment.
-   * @param bestPossibleAssignment The current best possible assignment.
    */
   ClusterModel(ClusterContext clusterContext, Set<AssignableReplica> assignableReplicas,
-      Set<AssignableNode> assignableNodes, Map<String, ResourceAssignment> baselineAssignment,
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
+      Set<AssignableNode> assignableNodes) {
     _clusterContext = clusterContext;
 
     // Save all the to be assigned replication
@@ -70,9 +60,6 @@ public class ClusterModel {
 
     _assignableNodeMap = assignableNodes.stream()
         .collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node));
-
-    _baselineAssignment = baselineAssignment;
-    _bestPossibleAssignment = bestPossibleAssignment;
   }
 
   public ClusterContext getContext() {
@@ -87,14 +74,6 @@ public class ClusterModel {
     return _assignableReplicaMap;
   }
 
-  public Map<String, ResourceAssignment> getBaseline() {
-    return _baselineAssignment;
-  }
-
-  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
-    return _bestPossibleAssignment;
-  }
-
   /**
    * Assign the given replica to the specified instance and record the assignment in the cluster model.
    * The cluster usage information will be updated accordingly.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 20024c7..af1a8d8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -86,12 +86,11 @@ public class ClusterModelProvider {
     // Construct and initialize cluster context.
     ClusterContext context = new ClusterContext(
         replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
-        activeInstances.size());
+        activeInstances.size(), baselineAssignment, bestPossibleAssignment);
     // Initial the cluster context with the allocated assignments.
     context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
 
-    return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, baselineAssignment,
-        bestPossibleAssignment);
+    return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
new file mode 100644
index 0000000..d3af35e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
@@ -0,0 +1,127 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestPartitionMovementConstraint {
+  private static final String INSTANCE = "TestInstance";
+  private static final String RESOURCE = "TestResource";
+  private static final String PARTITION = "TestPartition";
+  private AssignableNode _testNode;
+  private AssignableReplica _testReplica;
+  private ClusterContext _clusterContext;
+  private SoftConstraint _constraint = new PartitionMovementConstraint();
+
+  @BeforeMethod
+  public void init() {
+    _testNode = mock(AssignableNode.class);
+    _testReplica = mock(AssignableReplica.class);
+    _clusterContext = mock(ClusterContext.class);
+    when(_testReplica.getResourceName()).thenReturn(RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(PARTITION);
+    when(_testNode.getInstanceName()).thenReturn(INSTANCE);
+  }
+
+  @Test
+  public void testGetAssignmentScoreWhenBestPossibleBaselineMissing() {
+    when(_clusterContext.getBaselineAssignment()).thenReturn(Collections.emptyMap());
+    when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
+    float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+    float normalizedScore =
+        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+    Assert.assertEquals(score, 0f);
+    Assert.assertEquals(normalizedScore, 0f);
+  }
+
+  @Test
+  public void testGetAssignmentScoreWhenBestPossibleBaselineSame() {
+    ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class);
+    when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION)))
+        .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
+    Map<String, ResourceAssignment> assignmentMap =
+        ImmutableMap.of(RESOURCE, mockResourceAssignment);
+    when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap);
+    when(_clusterContext.getBestPossibleAssignment()).thenReturn(assignmentMap);
+    // when the calculated states are both equal to the replica's current state
+    when(_testReplica.getReplicaState()).thenReturn("Master");
+    float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+    float normalizedScore =
+        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+    Assert.assertEquals(score, 1f);
+    Assert.assertEquals(normalizedScore, 1f);
+    // when the calculated states are both different from the replica's current state
+    when(_testReplica.getReplicaState()).thenReturn("Slave");
+    score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+    normalizedScore =
+        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+    Assert.assertEquals(score, 0.5f);
+    Assert.assertEquals(normalizedScore, 0.5f);
+  }
+
+  @Test
+  public void testGetAssignmentScoreWhenBestPossibleBaselineOpposite() {
+    ResourceAssignment bestPossibleResourceAssignment = mock(ResourceAssignment.class);
+    when(bestPossibleResourceAssignment.getReplicaMap(new Partition(PARTITION)))
+        .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
+    ResourceAssignment baselineResourceAssignment = mock(ResourceAssignment.class);
+    when(baselineResourceAssignment.getReplicaMap(new Partition(PARTITION)))
+        .thenReturn(ImmutableMap.of(INSTANCE, "Slave"));
+    when(_clusterContext.getBaselineAssignment())
+        .thenReturn(ImmutableMap.of(RESOURCE, baselineResourceAssignment));
+    when(_clusterContext.getBestPossibleAssignment())
+        .thenReturn(ImmutableMap.of(RESOURCE, bestPossibleResourceAssignment));
+    // when the replica's state matches with best possible only
+    when(_testReplica.getReplicaState()).thenReturn("Master");
+    float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+    float normalizedScore =
+        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+    Assert.assertEquals(score, 0.875f);
+    Assert.assertEquals(normalizedScore, 0.875f);
+    // when the replica's state matches with baseline only
+    when(_testReplica.getReplicaState()).thenReturn("Slave");
+    score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
+    normalizedScore =
+        _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+
+    // The calculated score is lower than previous 0.875f cause the replica's state matches with
+    // best possible is preferred
+    Assert.assertEquals(score, 0.625f);
+    Assert.assertEquals(normalizedScore, 0.625f);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 08143c6..585c26f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -34,9 +34,9 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
     Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
     Set<AssignableNode> assignableNodes = generateNodes(testCache);
 
-    ClusterContext context = new ClusterContext(assignableReplicas, 2);
-    return new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(),
-        Collections.emptyMap());
+    ClusterContext context =
+        new ClusterContext(assignableReplicas, 2, Collections.emptyMap(), Collections.emptyMap());
+    return new ClusterModel(context, assignableReplicas, assignableNodes);
   }
 
   private Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
index 8206f29..d8b93c0 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -19,18 +19,19 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixException;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
 public class TestClusterContext extends AbstractTestClusterModel {
   @BeforeClass
   public void initialize() {
@@ -43,7 +44,7 @@ public class TestClusterContext extends AbstractTestClusterModel {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
 
-    ClusterContext context = new ClusterContext(assignmentSet, 2);
+    ClusterContext context = new ClusterContext(assignmentSet, 2, new HashMap<>(), new HashMap<>());
 
     // Note that we left some margin for the max estimation.
     Assert.assertEquals(context.getEstimatedMaxPartitionCount(), 3);
@@ -80,7 +81,7 @@ public class TestClusterContext extends AbstractTestClusterModel {
   public void testDuplicateAssign() throws IOException {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
-    ClusterContext context = new ClusterContext(assignmentSet, 2);
+    ClusterContext context = new ClusterContext(assignmentSet, 2, new HashMap<>(), new HashMap<>());
     context
         .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0));
     // Insert again and trigger the error.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index 5112413..12146b7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -19,17 +19,17 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
 public class TestClusterModel extends AbstractTestClusterModel {
   @BeforeClass
   public void initialize() {
@@ -54,10 +54,9 @@ public class TestClusterModel extends AbstractTestClusterModel {
     Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
     Set<AssignableNode> assignableNodes = generateNodes(testCache);
 
-    ClusterContext context = new ClusterContext(assignableReplicas, 2);
+    ClusterContext context = new ClusterContext(assignableReplicas, 2, Collections.emptyMap(), Collections.emptyMap());
     ClusterModel clusterModel =
-        new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(),
-            Collections.emptyMap());
+        new ClusterModel(context, assignableReplicas, assignableNodes);
 
     Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.values().isEmpty()));