You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:21 UTC

[helix] 18/37: Validate the instance capacity/partition weight configuration while constructing the assignable instances (#451)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a4b61eeeb74cd3363a281214dab11bb53ec013d8
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Fri Sep 6 19:17:19 2019 -0700

    Validate the instance capacity/partition weight configuration while constructing the assignable instances (#451)
    
    Compare the configure items with the required capacity keys that are defined in the cluster config when build the assignable instances.
    - According to the design, all the required capacity keys must appear in the instance capacity config.
    - As for the partition weights, the corresponding weight item will be filled with value 0 if the required capacity key is not specified in the resource config.
---
 .../rebalancer/waged/model/AssignableNode.java     | 41 ++++++++++++----
 .../rebalancer/waged/model/AssignableReplica.java  | 27 ++++++++---
 .../waged/model/ClusterModelProvider.java          | 31 ++++++-------
 .../waged/model/AbstractTestClusterModel.java      | 23 ++++-----
 .../rebalancer/waged/model/TestAssignableNode.java | 53 +++++++++++++--------
 .../waged/model/TestAssignableReplica.java         | 54 +++++++++++++++++-----
 6 files changed, 153 insertions(+), 76 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 35c3c38..33677e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,12 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixException;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.InstanceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,6 +29,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static java.lang.Math.max;
 
 /**
@@ -91,10 +91,7 @@ public class AssignableNode {
       Collection<AssignableReplica> existingAssignment) {
     reset();
 
-    Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
-    if (instanceCapacity.isEmpty()) {
-      instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
-    }
+    Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
     _currentCapacityMap.putAll(instanceCapacity);
     _faultZone = computeFaultZone(clusterConfig, instanceConfig);
     _instanceTags = new HashSet<>(instanceConfig.getTags());
@@ -213,6 +210,7 @@ public class AssignableNode {
    * The method dynamically returns the highest utilization number among all the capacity categories.
    * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
    * return 0.9.
+   *
    * @return The highest utilization number of the node among all the capacity category.
    */
   public float getHighestCapacityUtilization() {
@@ -359,6 +357,29 @@ public class AssignableNode {
     // a NOP; in other words, this node will be treated as if it has unlimited capacity.
   }
 
+  /**
+   * Get and validate the instance capacity from instance config.
+   *
+   * @throws HelixException if any required capacity key is not configured in the instance config.
+   */
+  private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
+      InstanceConfig instanceConfig) {
+    List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+    Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
+    if (instanceCapacity.isEmpty()) {
+      instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
+    }
+    // Remove all the non-required capacity items from the map.
+    instanceCapacity.keySet().retainAll(requiredCapacityKeys);
+    // All the required keys must exist in the instance config.
+    if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
+      throw new HelixException(String.format(
+          "The required capacity keys %s are not fully configured in the instance %s capacity map %s.",
+          requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
+    }
+    return instanceCapacity;
+  }
+
   @Override
   public int hashCode() {
     return _instanceName.hashCode();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index ade04bf..537bf70 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -19,12 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
 /**
  * This class represents a partition replication that needs to be allocated.
  */
@@ -40,18 +42,19 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
   private final String _replicaState;
 
   /**
+   * @param clusterConfig  The cluster config.
    * @param resourceConfig The resource config for the resource which contains the replication.
    * @param partitionName  The replication's partition name.
    * @param replicaState   The state of the replication.
    * @param statePriority  The priority of the replication's state.
    */
-  AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState,
-      int statePriority) {
+  AssignableReplica(ClusterConfig clusterConfig, ResourceConfig resourceConfig,
+      String partitionName, String replicaState, int statePriority) {
     _partitionName = partitionName;
     _replicaState = replicaState;
     _statePriority = statePriority;
     _resourceName = resourceConfig.getResourceName();
-    _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+    _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig, clusterConfig);
     _resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
     _resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance();
   }
@@ -127,7 +130,7 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
    * Parse the resource config for the partition weight.
    */
   private Map<String, Integer> fetchCapacityUsage(String partitionName,
-      ResourceConfig resourceConfig) {
+      ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
     Map<String, Map<String, Integer>> capacityMap;
     try {
       capacityMap = resourceConfig.getPartitionCapacityMap();
@@ -146,6 +149,16 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
           "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.",
           partitionName, resourceConfig.getResourceName()));
     }
+
+    List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+    // Remove the non-required capacity items.
+    partitionCapacity.keySet().retainAll(requiredCapacityKeys);
+    // If any required capacity key is not configured in the resource config, fill the partition
+    // capacity map with 0 usage.
+    for (String capacityKey : requiredCapacityKeys) {
+      partitionCapacity.putIfAbsent(capacityKey, 0);
+    }
+
     return partitionCapacity;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 61f5d8d..3570164 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -30,14 +38,6 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 /**
  * This util class generates Cluster Model object based on the controller's data cache.
  */
@@ -112,8 +112,8 @@ public class ClusterModelProvider {
       Map<String, ResourceAssignment> bestPossibleAssignment,
       Map<String, Set<AssignableReplica>> allocatedReplicas) {
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
-    if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG)
-        || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+    if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) || clusterChanges
+        .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
       // If the cluster topology has been modified, need to reassign all replicas
       toBeAssignedReplicas
           .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
@@ -125,11 +125,9 @@ public class ClusterModelProvider {
         // 2. if the resource does appear in the best possible assignment, need to reassign.
         if (clusterChanges
             .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet())
-            .contains(resourceName)
-            || clusterChanges
+            .contains(resourceName) || clusterChanges
             .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
-            .contains(resourceName)
-            || !bestPossibleAssignment.containsKey(resourceName)) {
+            .contains(resourceName) || !bestPossibleAssignment.containsKey(resourceName)) {
           toBeAssignedReplicas.addAll(replicas);
           continue; // go to check next resource
         } else {
@@ -196,9 +194,10 @@ public class ClusterModelProvider {
       ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
       int instanceCount) {
     Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
+    ClusterConfig clusterConfig = dataProvider.getClusterConfig();
 
     for (String resourceName : resourceMap.keySet()) {
-      ResourceConfig config = dataProvider.getResourceConfig(resourceName);
+      ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName);
       IdealState is = dataProvider.getIdealState(resourceName);
       if (is == null) {
         throw new HelixException(
@@ -220,7 +219,7 @@ public class ClusterModelProvider {
           String state = entry.getKey();
           for (int i = 0; i < entry.getValue(); i++) {
             totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add(
-                new AssignableReplica(config, partition, state,
+                new AssignableReplica(clusterConfig, resourceConfig, partition, state,
                     def.getStatePriorityMap().get(state)));
           }
         }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index d99a3fb..a8a5de5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -19,6 +19,15 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -29,15 +38,6 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.BeforeClass;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
@@ -104,6 +104,7 @@ public abstract class AbstractTestClusterModel {
     testClusterConfig.setMaxPartitionsPerInstance(5);
     testClusterConfig.setDisabledInstances(Collections.emptyMap());
     testClusterConfig.setTopologyAwareEnabled(false);
+    testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet()));
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
 
     // 3. Mock the live instance node for the default instance.
@@ -179,8 +180,8 @@ public abstract class AbstractTestClusterModel {
       ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
       // Construct one AssignableReplica for each partition in the current state.
       cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add(
-          new AssignableReplica(resourceConfig, entry.getKey(), entry.getValue(),
-              entry.getValue().equals("MASTER") ? 1 : 2)));
+          new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, entry.getKey(),
+              entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
     }
     return assignmentSet;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 34a03a9..92a6998 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -19,22 +19,24 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixException;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.InstanceConfig;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
 import static org.mockito.Mockito.when;
 
 public class TestAssignableNode extends AbstractTestClusterModel {
@@ -87,9 +89,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
         expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
 
     // Test 2 - release assignment from the AssignableNode
-    AssignableReplica removingReplica =
-        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
-            _partitionNames.get(2), "MASTER", 1);
+    AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
+        testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2), "MASTER", 1);
     expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2));
     expectedCapacityMap.put("item1", 9);
     expectedCapacityMap.put("item2", 18);
@@ -128,9 +129,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
         expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
 
     // Test 3 - add assignment to the AssignableNode
-    AssignableReplica addingReplica =
-        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
-            _partitionNames.get(2), "SLAVE", 2);
+    AssignableReplica addingReplica = new AssignableReplica(testCache.getClusterConfig(),
+        testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2), "SLAVE", 2);
     expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2));
     expectedCapacityMap.put("item1", 4);
     expectedCapacityMap.put("item2", 8);
@@ -169,9 +169,9 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
         Collections.emptyList());
-    AssignableReplica removingReplica =
-        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
-            _partitionNames.get(2) + "non-exist", "MASTER", 1);
+    AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
+        testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
+        "MASTER", 1);
 
     // Release shall pass.
     assignableNode.release(removingReplica);
@@ -184,9 +184,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
-    AssignableReplica duplicateReplica =
-        new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(0)),
-            _partitionNames.get(0), "SLAVE", 2);
+    AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
+        testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
     assignableNode.assign(duplicateReplica);
   }
 
@@ -264,4 +263,18 @@ public class TestAssignableNode extends AbstractTestClusterModel {
             Collections.emptyList());
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
   }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance testInstanceId capacity map \\{item2=40, item1=20, item3=30\\}.")
+  public void testIncompleteInstanceCapacity() {
+    ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+    List<String> requiredCapacityKeys = new ArrayList<>(_capacityDataMap.keySet());
+    requiredCapacityKeys.add("AdditionalCapacityKey");
+    testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);
+
+    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
+
+    new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
+        Collections.emptyList());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
index d069ced..a247537 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
@@ -19,16 +19,19 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 public class TestAssignableReplica {
   String resourceName = "Resource";
   String partitionNamePrefix = "partition";
@@ -38,7 +41,7 @@ public class TestAssignableReplica {
   int slavePriority = 2;
 
   @Test
-  public void testConstructRepliaWithResourceConfig() throws IOException {
+  public void testConstructReplicaWithResourceConfig() throws IOException {
     // Init assignable replica with a basic config object
     Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
     capacityDataMapResource1.put("item1", 3);
@@ -46,11 +49,13 @@ public class TestAssignableReplica {
     ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
     testResourceConfigResource.setPartitionCapacityMap(
         Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
+    ClusterConfig testClusterConfig = new ClusterConfig("testCluster");
+    testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(capacityDataMapResource1.keySet()));
 
     String partitionName = partitionNamePrefix + 1;
     AssignableReplica replica =
-        new AssignableReplica(testResourceConfigResource, partitionName, masterState,
-            masterPriority);
+        new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName,
+            masterState, masterPriority);
     Assert.assertEquals(replica.getResourceName(), resourceName);
     Assert.assertEquals(replica.getPartitionName(), partitionName);
     Assert.assertEquals(replica.getReplicaState(), masterState);
@@ -79,14 +84,14 @@ public class TestAssignableReplica {
         .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
             maxPartition);
 
-    replica = new AssignableReplica(testResourceConfigResource, partitionName, masterState,
-        masterPriority);
+    replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName,
+        masterState, masterPriority);
     Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1);
     Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
     Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
 
-    replica = new AssignableReplica(testResourceConfigResource, partitionName2, slaveState,
-        slavePriority);
+    replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName2,
+        slaveState, slavePriority);
     Assert.assertEquals(replica.getResourceName(), resourceName);
     Assert.assertEquals(replica.getPartitionName(), partitionName2);
     Assert.assertEquals(replica.getReplicaState(), slaveState);
@@ -96,4 +101,29 @@ public class TestAssignableReplica {
     Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
     Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
   }
+
+  @Test
+  public void testIncompletePartitionWeightConfig() throws IOException {
+    // Init assignable replica with a basic config object
+    Map<String, Integer> capacityDataMapResource = new HashMap<>();
+    capacityDataMapResource.put("item1", 3);
+    capacityDataMapResource.put("item2", 6);
+    ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
+    testResourceConfigResource.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource));
+    ClusterConfig testClusterConfig = new ClusterConfig("testCluster");
+    List<String> requiredCapacityKeys = new ArrayList<>(capacityDataMapResource.keySet());
+    // Remove one required key, so it becomes a unnecessary item.
+    String unnecessaryCapacityKey = requiredCapacityKeys.remove(0);
+    // Add one new required key, so it does not exist in the resource config.
+    String newCapacityKey = "newCapacityKey";
+    requiredCapacityKeys.add(newCapacityKey);
+    testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);
+
+    AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
+        partitionNamePrefix + 1, masterState, masterPriority);
+    Assert.assertTrue(replica.getCapacity().keySet().containsAll(requiredCapacityKeys));
+    Assert.assertEquals(replica.getCapacity().get(newCapacityKey).intValue(), 0);
+    Assert.assertFalse(replica.getCapacity().containsKey(unnecessaryCapacityKey));
+  }
 }