You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:21 UTC
[helix] 18/37: Validate the instance capacity/partition weight
configuration while constructing the assignable instances (#451)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit a4b61eeeb74cd3363a281214dab11bb53ec013d8
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Fri Sep 6 19:17:19 2019 -0700
Validate the instance capacity/partition weight configuration while constructing the assignable instances (#451)
Compare the configure items with the required capacity keys that are defined in the cluster config when build the assignable instances.
- According to the design, all the required capacity keys must appear in the instance capacity config.
- As for the partition weights, the corresponding weight item will be filled with value 0 if the required capacity key is not specified in the resource config.
---
.../rebalancer/waged/model/AssignableNode.java | 41 ++++++++++++----
.../rebalancer/waged/model/AssignableReplica.java | 27 ++++++++---
.../waged/model/ClusterModelProvider.java | 31 ++++++-------
.../waged/model/AbstractTestClusterModel.java | 23 ++++-----
.../rebalancer/waged/model/TestAssignableNode.java | 53 +++++++++++++--------
.../waged/model/TestAssignableReplica.java | 54 +++++++++++++++++-----
6 files changed, 153 insertions(+), 76 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 35c3c38..33677e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,12 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixException;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.InstanceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -35,6 +29,12 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static java.lang.Math.max;
/**
@@ -91,10 +91,7 @@ public class AssignableNode {
Collection<AssignableReplica> existingAssignment) {
reset();
- Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
- if (instanceCapacity.isEmpty()) {
- instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
- }
+ Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
_currentCapacityMap.putAll(instanceCapacity);
_faultZone = computeFaultZone(clusterConfig, instanceConfig);
_instanceTags = new HashSet<>(instanceConfig.getTags());
@@ -213,6 +210,7 @@ public class AssignableNode {
* The method dynamically returns the highest utilization number among all the capacity categories.
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
* return 0.9.
+ *
* @return The highest utilization number of the node among all the capacity category.
*/
public float getHighestCapacityUtilization() {
@@ -359,6 +357,29 @@ public class AssignableNode {
// a NOP; in other words, this node will be treated as if it has unlimited capacity.
}
+ /**
+ * Get and validate the instance capacity from instance config.
+ *
+ * @throws HelixException if any required capacity key is not configured in the instance config.
+ */
+ private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
+ InstanceConfig instanceConfig) {
+ List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+ Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
+ if (instanceCapacity.isEmpty()) {
+ instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
+ }
+ // Remove all the non-required capacity items from the map.
+ instanceCapacity.keySet().retainAll(requiredCapacityKeys);
+ // All the required keys must exist in the instance config.
+ if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
+ throw new HelixException(String.format(
+ "The required capacity keys %s are not fully configured in the instance %s capacity map %s.",
+ requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
+ }
+ return instanceCapacity;
+ }
+
@Override
public int hashCode() {
return _instanceName.hashCode();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index ade04bf..537bf70 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -19,12 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
/**
* This class represents a partition replication that needs to be allocated.
*/
@@ -40,18 +42,19 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
private final String _replicaState;
/**
+ * @param clusterConfig The cluster config.
* @param resourceConfig The resource config for the resource which contains the replication.
* @param partitionName The replication's partition name.
* @param replicaState The state of the replication.
* @param statePriority The priority of the replication's state.
*/
- AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState,
- int statePriority) {
+ AssignableReplica(ClusterConfig clusterConfig, ResourceConfig resourceConfig,
+ String partitionName, String replicaState, int statePriority) {
_partitionName = partitionName;
_replicaState = replicaState;
_statePriority = statePriority;
_resourceName = resourceConfig.getResourceName();
- _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+ _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig, clusterConfig);
_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
_resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance();
}
@@ -127,7 +130,7 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
* Parse the resource config for the partition weight.
*/
private Map<String, Integer> fetchCapacityUsage(String partitionName,
- ResourceConfig resourceConfig) {
+ ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
Map<String, Map<String, Integer>> capacityMap;
try {
capacityMap = resourceConfig.getPartitionCapacityMap();
@@ -146,6 +149,16 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
"The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.",
partitionName, resourceConfig.getResourceName()));
}
+
+ List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+ // Remove the non-required capacity items.
+ partitionCapacity.keySet().retainAll(requiredCapacityKeys);
+ // If any required capacity key is not configured in the resource config, fill the partition
+ // capacity map with 0 usage.
+ for (String capacityKey : requiredCapacityKeys) {
+ partitionCapacity.putIfAbsent(capacityKey, 0);
+ }
+
return partitionCapacity;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 61f5d8d..3570164 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -30,14 +38,6 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
/**
* This util class generates Cluster Model object based on the controller's data cache.
*/
@@ -112,8 +112,8 @@ public class ClusterModelProvider {
Map<String, ResourceAssignment> bestPossibleAssignment,
Map<String, Set<AssignableReplica>> allocatedReplicas) {
Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
- if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG)
- || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+ if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) || clusterChanges
+ .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
// If the cluster topology has been modified, need to reassign all replicas
toBeAssignedReplicas
.addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
@@ -125,11 +125,9 @@ public class ClusterModelProvider {
// 2. if the resource does appear in the best possible assignment, need to reassign.
if (clusterChanges
.getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet())
- .contains(resourceName)
- || clusterChanges
+ .contains(resourceName) || clusterChanges
.getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
- .contains(resourceName)
- || !bestPossibleAssignment.containsKey(resourceName)) {
+ .contains(resourceName) || !bestPossibleAssignment.containsKey(resourceName)) {
toBeAssignedReplicas.addAll(replicas);
continue; // go to check next resource
} else {
@@ -196,9 +194,10 @@ public class ClusterModelProvider {
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
int instanceCount) {
Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
+ ClusterConfig clusterConfig = dataProvider.getClusterConfig();
for (String resourceName : resourceMap.keySet()) {
- ResourceConfig config = dataProvider.getResourceConfig(resourceName);
+ ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName);
IdealState is = dataProvider.getIdealState(resourceName);
if (is == null) {
throw new HelixException(
@@ -220,7 +219,7 @@ public class ClusterModelProvider {
String state = entry.getKey();
for (int i = 0; i < entry.getValue(); i++) {
totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add(
- new AssignableReplica(config, partition, state,
+ new AssignableReplica(clusterConfig, resourceConfig, partition, state,
def.getStatePriorityMap().get(state)));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index d99a3fb..a8a5de5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -19,6 +19,15 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
@@ -29,15 +38,6 @@ import org.apache.helix.model.ResourceConfig;
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import static org.mockito.Mockito.when;
public abstract class AbstractTestClusterModel {
@@ -104,6 +104,7 @@ public abstract class AbstractTestClusterModel {
testClusterConfig.setMaxPartitionsPerInstance(5);
testClusterConfig.setDisabledInstances(Collections.emptyMap());
testClusterConfig.setTopologyAwareEnabled(false);
+ testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet()));
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
// 3. Mock the live instance node for the default instance.
@@ -179,8 +180,8 @@ public abstract class AbstractTestClusterModel {
ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
// Construct one AssignableReplica for each partition in the current state.
cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add(
- new AssignableReplica(resourceConfig, entry.getKey(), entry.getValue(),
- entry.getValue().equals("MASTER") ? 1 : 2)));
+ new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig, entry.getKey(),
+ entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
}
return assignmentSet;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 34a03a9..92a6998 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -19,22 +19,24 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixException;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.InstanceConfig;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
import static org.mockito.Mockito.when;
public class TestAssignableNode extends AbstractTestClusterModel {
@@ -87,9 +89,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
// Test 2 - release assignment from the AssignableNode
- AssignableReplica removingReplica =
- new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
- _partitionNames.get(2), "MASTER", 1);
+ AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
+ testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2), "MASTER", 1);
expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2));
expectedCapacityMap.put("item1", 9);
expectedCapacityMap.put("item2", 18);
@@ -128,9 +129,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
expectedTopStateAssignmentSet1.size() + expectedTopStateAssignmentSet2.size());
// Test 3 - add assignment to the AssignableNode
- AssignableReplica addingReplica =
- new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
- _partitionNames.get(2), "SLAVE", 2);
+ AssignableReplica addingReplica = new AssignableReplica(testCache.getClusterConfig(),
+ testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2), "SLAVE", 2);
expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2));
expectedCapacityMap.put("item1", 4);
expectedCapacityMap.put("item2", 8);
@@ -169,9 +169,9 @@ public class TestAssignableNode extends AbstractTestClusterModel {
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
Collections.emptyList());
- AssignableReplica removingReplica =
- new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
- _partitionNames.get(2) + "non-exist", "MASTER", 1);
+ AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
+ testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
+ "MASTER", 1);
// Release shall pass.
assignableNode.release(removingReplica);
@@ -184,9 +184,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
- AssignableReplica duplicateReplica =
- new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(0)),
- _partitionNames.get(0), "SLAVE", 2);
+ AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
+ testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
assignableNode.assign(duplicateReplica);
}
@@ -264,4 +263,18 @@ public class TestAssignableNode extends AbstractTestClusterModel {
Collections.emptyList());
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
}
+
+ @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The required capacity keys \\[item2, item1, item3, AdditionalCapacityKey\\] are not fully configured in the instance testInstanceId capacity map \\{item2=40, item1=20, item3=30\\}.")
+ public void testIncompleteInstanceCapacity() {
+ ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+ List<String> requiredCapacityKeys = new ArrayList<>(_capacityDataMap.keySet());
+ requiredCapacityKeys.add("AdditionalCapacityKey");
+ testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);
+
+ InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+ testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
+
+ new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
+ Collections.emptyList());
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
index d069ced..a247537 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
@@ -19,16 +19,19 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
public class TestAssignableReplica {
String resourceName = "Resource";
String partitionNamePrefix = "partition";
@@ -38,7 +41,7 @@ public class TestAssignableReplica {
int slavePriority = 2;
@Test
- public void testConstructRepliaWithResourceConfig() throws IOException {
+ public void testConstructReplicaWithResourceConfig() throws IOException {
// Init assignable replica with a basic config object
Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
capacityDataMapResource1.put("item1", 3);
@@ -46,11 +49,13 @@ public class TestAssignableReplica {
ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
testResourceConfigResource.setPartitionCapacityMap(
Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
+ ClusterConfig testClusterConfig = new ClusterConfig("testCluster");
+ testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(capacityDataMapResource1.keySet()));
String partitionName = partitionNamePrefix + 1;
AssignableReplica replica =
- new AssignableReplica(testResourceConfigResource, partitionName, masterState,
- masterPriority);
+ new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName,
+ masterState, masterPriority);
Assert.assertEquals(replica.getResourceName(), resourceName);
Assert.assertEquals(replica.getPartitionName(), partitionName);
Assert.assertEquals(replica.getReplicaState(), masterState);
@@ -79,14 +84,14 @@ public class TestAssignableReplica {
.setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
maxPartition);
- replica = new AssignableReplica(testResourceConfigResource, partitionName, masterState,
- masterPriority);
+ replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName,
+ masterState, masterPriority);
Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1);
Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
- replica = new AssignableReplica(testResourceConfigResource, partitionName2, slaveState,
- slavePriority);
+ replica = new AssignableReplica(testClusterConfig, testResourceConfigResource, partitionName2,
+ slaveState, slavePriority);
Assert.assertEquals(replica.getResourceName(), resourceName);
Assert.assertEquals(replica.getPartitionName(), partitionName2);
Assert.assertEquals(replica.getReplicaState(), slaveState);
@@ -96,4 +101,29 @@ public class TestAssignableReplica {
Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
}
+
+ @Test
+ public void testIncompletePartitionWeightConfig() throws IOException {
+ // Init assignable replica with a basic config object
+ Map<String, Integer> capacityDataMapResource = new HashMap<>();
+ capacityDataMapResource.put("item1", 3);
+ capacityDataMapResource.put("item2", 6);
+ ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
+ testResourceConfigResource.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource));
+ ClusterConfig testClusterConfig = new ClusterConfig("testCluster");
+ List<String> requiredCapacityKeys = new ArrayList<>(capacityDataMapResource.keySet());
+ // Remove one required key, so it becomes a unnecessary item.
+ String unnecessaryCapacityKey = requiredCapacityKeys.remove(0);
+ // Add one new required key, so it does not exist in the resource config.
+ String newCapacityKey = "newCapacityKey";
+ requiredCapacityKeys.add(newCapacityKey);
+ testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);
+
+ AssignableReplica replica = new AssignableReplica(testClusterConfig, testResourceConfigResource,
+ partitionNamePrefix + 1, masterState, masterPriority);
+ Assert.assertTrue(replica.getCapacity().keySet().containsAll(requiredCapacityKeys));
+ Assert.assertEquals(replica.getCapacity().get(newCapacityKey).intValue(), 0);
+ Assert.assertFalse(replica.getCapacity().containsKey(unnecessaryCapacityKey));
+ }
}