You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:40 UTC

[helix] 02/50: Adding the configuration items of the WAGED rebalancer. (#348)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8d377cc83ee23d3ac782d789628c0e85a5adbd12
Author: jiajunwang <18...@users.noreply.github.com>
AuthorDate: Fri Jul 26 11:42:52 2019 -0700

    Adding the configuration items of the WAGED rebalancer. (#348)
    
    * Adding the configuration items of the WAGED rebalancer.
    
    Including: Instance Capacity Keys, Rebalance Preferences, Instance Capacity Details, Partition Capacity (the weight) Details.
    Also adding test to cover the new configuration items.
---
 .../java/org/apache/helix/model/ClusterConfig.java | 129 +++++++++++---
 .../org/apache/helix/model/InstanceConfig.java     |  62 +++++--
 .../org/apache/helix/model/ResourceConfig.java     | 139 ++++++++++++++-
 .../org/apache/helix/model/TestClusterConfig.java  | 130 ++++++++++++++
 .../org/apache/helix/model/TestInstanceConfig.java |  66 +++++++-
 .../org/apache/helix/model/TestResourceConfig.java | 186 +++++++++++++++++++++
 6 files changed, 669 insertions(+), 43 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 8905dda..ee942c7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -19,12 +19,8 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
@@ -32,6 +28,12 @@ import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Cluster configurations
  */
@@ -80,7 +82,19 @@ public class ClusterConfig extends HelixProperty {
     DISABLED_INSTANCES,
 
     // Specifies job types and used for quota allocation
-    QUOTA_TYPES
+    QUOTA_TYPES,
+
+    // The required instance capacity keys for resource partition assignment calculation.
+    INSTANCE_CAPACITY_KEYS,
+    // The preference of the rebalance result.
+    // EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
+    // LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
+    REBALANCE_PREFERENCE
+  }
+
+  public enum GlobalRebalancePreferenceKey {
+    EVENNESS,
+    LESS_MOVEMENT
   }
 
   private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
@@ -94,6 +108,15 @@ public class ClusterConfig extends HelixProperty {
 
   public final static String TASK_QUOTA_RATIO_NOT_SET = "-1";
 
+  // Default preference for all the aspects should be the same to ensure balanced setup.
+  public final static Map<GlobalRebalancePreferenceKey, Integer>
+      DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
+      ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder()
+          .put(GlobalRebalancePreferenceKey.EVENNESS, 1)
+          .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
+  private final static int MAX_REBALANCE_PREFERENCE = 10;
+  private final static int MIN_REBALANCE_PREFERENCE = 0;
+
   /**
    * Instantiate for a specific cluster
    * @param cluster the cluster identifier
@@ -112,21 +135,21 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Set task quota type with the ratio of this quota.
-   * @param quotaType String
+   * @param quotaType  String
    * @param quotaRatio int
    */
   public void setTaskQuotaRatio(String quotaType, int quotaRatio) {
     if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
       _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>());
     }
-    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType,
-        Integer.toString(quotaRatio));
+    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name())
+        .put(quotaType, Integer.toString(quotaRatio));
   }
 
   /**
    * Set task quota type with the ratio of this quota. Quota ratio must be a String that is
    * parse-able into an int.
-   * @param quotaType String
+   * @param quotaType  String
    * @param quotaRatio String
    */
   public void setTaskQuotaRatio(String quotaType, String quotaRatio) {
@@ -209,8 +232,8 @@ public class ClusterConfig extends HelixProperty {
    * @return
    */
   public Boolean isPersistIntermediateAssignment() {
-    return _record.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(),
-        false);
+    return _record
+        .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false);
   }
 
   /**
@@ -232,8 +255,8 @@ public class ClusterConfig extends HelixProperty {
   }
 
   public Boolean isPipelineTriggersDisabled() {
-    return _record.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(),
-        false);
+    return _record
+        .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
   }
 
   /**
@@ -402,8 +425,8 @@ public class ClusterConfig extends HelixProperty {
    * @return
    */
   public int getNumOfflineInstancesForAutoExit() {
-    return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(),
-        -1);
+    return _record
+        .getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), -1);
   }
 
   /**
@@ -443,9 +466,7 @@ public class ClusterConfig extends HelixProperty {
     if (obj instanceof ClusterConfig) {
       ClusterConfig that = (ClusterConfig) obj;
 
-      if (this.getId().equals(that.getId())) {
-        return true;
-      }
+      return this.getId().equals(that.getId());
     }
     return false;
   }
@@ -489,8 +510,8 @@ public class ClusterConfig extends HelixProperty {
     }
 
     if (!configStrs.isEmpty()) {
-      _record.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(),
-          configStrs);
+      _record
+          .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs);
     }
   }
 
@@ -578,7 +599,7 @@ public class ClusterConfig extends HelixProperty {
   public int getErrorPartitionThresholdForLoadBalance() {
     return _record.getIntField(
         ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
-        DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
+            DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
   }
 
   /**
@@ -657,6 +678,70 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Set the required Instance Capacity Keys.
+   * @param capacityKeys
+   */
+  public void setInstanceCapacityKeys(List<String> capacityKeys) {
+    if (capacityKeys == null || capacityKeys.isEmpty()) {
+      throw new IllegalArgumentException("The input instance capacity key list is empty.");
+    }
+    _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
+  }
+
+  /**
+   * @return The required Instance Capacity Keys. If not configured, return an empty list.
+   */
+  public List<String> getInstanceCapacityKeys() {
+    List<String> capacityKeys = _record.getListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name());
+    if (capacityKeys == null) {
+      return Collections.emptyList();
+    }
+    return capacityKeys;
+  }
+
+  /**
+   * Set the global rebalancer's assignment preference.
+   * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
+   *                   The ratio of the configured weights will determine the rebalancer's behavior.
+   */
+  public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) {
+    Map<String, String> preferenceMap = new HashMap<>();
+
+    preference.entrySet().stream().forEach(entry -> {
+      if (entry.getValue() > MAX_REBALANCE_PREFERENCE
+          || entry.getValue() < MIN_REBALANCE_PREFERENCE) {
+        throw new IllegalArgumentException(String
+            .format("Invalid global rebalance preference configuration. Key %s, Value %d.",
+                entry.getKey().name(), entry.getValue()));
+      }
+      preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue()));
+    });
+
+    _record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap);
+  }
+
+  /**
+   * Get the global rebalancer's assignment preference.
+   */
+  public Map<GlobalRebalancePreferenceKey, Integer> getGlobalRebalancePreference() {
+    Map<String, String> preferenceStrMap =
+        _record.getMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name());
+    if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) {
+      Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+      for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) {
+        if (!preferenceStrMap.containsKey(key.name())) {
+          // If any key is not configured with a value, return the default config.
+          return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
+        }
+        preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
+      }
+      return preference;
+    }
+    // If configuration is not complete, return the default one.
+    return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
+  }
+
+  /**
    * Get IdealState rules defined in the cluster config.
    * @return
    */
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index f65a1bd..88fd1dd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -19,6 +19,14 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import com.google.common.base.Splitter;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -27,15 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.util.HelixUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Splitter;
+import java.util.stream.Collectors;
 
 /**
  * Instance configurations
@@ -55,7 +55,8 @@ public class InstanceConfig extends HelixProperty {
     INSTANCE_WEIGHT,
     DOMAIN,
     DELAY_REBALANCE_ENABLED,
-    MAX_CONCURRENT_TASK
+    MAX_CONCURRENT_TASK,
+    INSTANCE_CAPACITY_MAP
   }
 
   public static final int WEIGHT_NOT_SET = -1;
@@ -505,6 +506,47 @@ public class InstanceConfig extends HelixProperty {
     _record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask);
   }
 
+  /**
+   * Get the instance capacity information from the map fields
+   *
+   * @return data map if it exists, or empty map
+   */
+  public Map<String, Integer> getInstanceCapacityMap() {
+    Map<String, String> capacityData =
+        _record.getMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name());
+
+    if (capacityData != null) {
+      return capacityData.entrySet().stream().collect(
+          Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Set the instance capacity information with an Integer mapping
+   * @param capacityDataMap - map of instance capacity data
+   * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
+   */
+  public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
+      throws IllegalArgumentException {
+    if (capacityDataMap == null || capacityDataMap.size() == 0) {
+      throw new IllegalArgumentException("Capacity Data is empty");
+    }
+
+    Map<String, String> capacityData = new HashMap<>();
+
+    capacityDataMap.entrySet().stream().forEach(entry -> {
+      if (entry.getValue() < 0) {
+        throw new IllegalArgumentException(String
+            .format("Capacity Data contains a negative value: %s = %d", entry.getKey(),
+                entry.getValue()));
+      }
+      capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
+    });
+
+    _record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof InstanceConfig) {
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 274640c..1ead08e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -19,19 +19,23 @@ package org.apache.helix.model;
  * under the License.
  */
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import java.util.TreeMap;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 /**
  * Resource configurations
  */
@@ -53,7 +57,8 @@ public class ResourceConfig extends HelixProperty {
     RESOURCE_TYPE,
     GROUP_ROUTING_ENABLED,
     EXTERNAL_VIEW_DISABLED,
-    DELAY_REBALANCE_ENABLED
+    DELAY_REBALANCE_ENABLED,
+    PARTITION_CAPACITY_MAP
   }
 
   public enum ResourceConfigConstants {
@@ -61,6 +66,10 @@ public class ResourceConfig extends HelixProperty {
   }
 
   private static final Logger _logger = LoggerFactory.getLogger(ResourceConfig.class.getName());
+  private static final ObjectMapper _objectMapper = new ObjectMapper();
+
+  public static final String DEFAULT_PARTITION_KEY = "DEFAULT";
+
   /**
    * Instantiate for a specific instance
    *
@@ -92,10 +101,24 @@ public class ResourceConfig extends HelixProperty {
       String stateModelDefRef, String stateModelFactoryName, String numReplica,
       int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag,
       Boolean helixEnabled, String resourceGroupName, String resourceType,
-      Boolean groupRoutingEnabled, Boolean externalViewDisabled,
-      RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig,
+      Boolean groupRoutingEnabled, Boolean externalViewDisabled, RebalanceConfig rebalanceConfig,
+      StateTransitionTimeoutConfig stateTransitionTimeoutConfig,
       Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields,
       Boolean p2pMessageEnabled) {
+    this(resourceId, monitorDisabled, numPartitions, stateModelDefRef, stateModelFactoryName,
+        numReplica, minActiveReplica, maxPartitionsPerInstance, instanceGroupTag, helixEnabled,
+        resourceGroupName, resourceType, groupRoutingEnabled, externalViewDisabled, rebalanceConfig,
+        stateTransitionTimeoutConfig, listFields, mapFields, p2pMessageEnabled, null);
+  }
+
+  private ResourceConfig(String resourceId, Boolean monitorDisabled, int numPartitions,
+    String stateModelDefRef, String stateModelFactoryName, String numReplica,
+    int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag,
+        Boolean helixEnabled, String resourceGroupName, String resourceType,
+        Boolean groupRoutingEnabled, Boolean externalViewDisabled,
+        RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig,
+        Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields,
+        Boolean p2pMessageEnabled, Map<String, Map<String, Integer>> partitionCapacityMap) {
     super(resourceId);
 
     if (monitorDisabled != null) {
@@ -172,6 +195,15 @@ public class ResourceConfig extends HelixProperty {
     if (mapFields != null) {
       _record.setMapFields(mapFields);
     }
+
+    if (partitionCapacityMap != null) {
+      try {
+        setPartitionCapacityMap(partitionCapacityMap);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            "Failed to set partition capacity. Invalid capacity configuration.");
+      }
+    }
   }
 
 
@@ -350,6 +382,64 @@ public class ResourceConfig extends HelixProperty {
   }
 
   /**
+   * Get the partition capacity information from a JSON among the map fields.
+   * <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
+   *
+   * @return data map if it exists, or empty map
+   * @throws IOException - when JSON conversion fails
+   */
+  public Map<String, Map<String, Integer>> getPartitionCapacityMap() throws IOException {
+    Map<String, String> partitionCapacityData =
+        _record.getMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name());
+    Map<String, Map<String, Integer>> partitionCapacityMap = new HashMap<>();
+    if (partitionCapacityData != null) {
+      for (String partition : partitionCapacityData.keySet()) {
+        Map<String, Integer> capacities = _objectMapper
+            .readValue(partitionCapacityData.get(partition),
+                new TypeReference<Map<String, Integer>>() {
+                });
+        partitionCapacityMap.put(partition, capacities);
+      }
+    }
+    return partitionCapacityMap;
+  }
+
+  /**
+   * Set the partition capacity information with a map <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
+   *
+   * @param partitionCapacityMap - map of partition capacity data
+   * @throws IllegalArgumentException - when any of the data value is a negative number or map is empty
+   * @throws IOException              - when JSON parsing fails
+   */
+  public void setPartitionCapacityMap(Map<String, Map<String, Integer>> partitionCapacityMap)
+      throws IllegalArgumentException, IOException {
+    if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) {
+      throw new IllegalArgumentException("Capacity Map is empty");
+    }
+    if (!partitionCapacityMap.containsKey(DEFAULT_PARTITION_KEY)) {
+      throw new IllegalArgumentException(String
+          .format("The default partition capacity with the default key %s is required.",
+              DEFAULT_PARTITION_KEY));
+    }
+
+    Map<String, String> newCapacityRecord = new HashMap<>();
+    for (String partition : partitionCapacityMap.keySet()) {
+      Map<String, Integer> capacities = partitionCapacityMap.get(partition);
+      // Verify the input is valid
+      if (capacities.isEmpty()) {
+        throw new IllegalArgumentException("Capacity Data is empty");
+      }
+      if (capacities.entrySet().stream().anyMatch(entry -> entry.getValue() < 0)) {
+        throw new IllegalArgumentException(
+            String.format("Capacity Data contains a negative value:%s", capacities.toString()));
+      }
+      newCapacityRecord.put(partition, _objectMapper.writeValueAsString(capacities));
+    }
+
+    _record.setMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), newCapacityRecord);
+  }
+
+  /**
    * Put a set of simple configs.
    *
    * @param configsMap
@@ -476,6 +566,7 @@ public class ResourceConfig extends HelixProperty {
     private StateTransitionTimeoutConfig _stateTransitionTimeoutConfig;
     private Map<String, List<String>> _preferenceLists;
     private Map<String, Map<String, String>> _mapFields;
+    private Map<String, Map<String, Integer>> _partitionCapacityMap;
 
     public Builder(String resourceId) {
       _resourceId = resourceId;
@@ -664,6 +755,23 @@ public class ResourceConfig extends HelixProperty {
       return _preferenceLists;
     }
 
+    public Builder setPartitionCapacity(Map<String, Integer> defaultCapacity) {
+      setPartitionCapacity(DEFAULT_PARTITION_KEY, defaultCapacity);
+      return this;
+    }
+
+    public Builder setPartitionCapacity(String partition, Map<String, Integer> capacity) {
+      if (_partitionCapacityMap == null) {
+        _partitionCapacityMap = new HashMap<>();
+      }
+      _partitionCapacityMap.put(partition, capacity);
+      return this;
+    }
+
+    public Map<String, Integer> getPartitionCapacity(String partition) {
+      return _partitionCapacityMap.get(partition);
+    }
+
     public Builder setMapField(String key, Map<String, String> fields) {
       if (_mapFields == null) {
         _mapFields = new TreeMap<>();
@@ -708,6 +816,19 @@ public class ResourceConfig extends HelixProperty {
           }
         }
       }
+
+      if (_partitionCapacityMap != null) {
+        if (_partitionCapacityMap.keySet().stream()
+            .noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) {
+          throw new IllegalArgumentException(
+              "Partition capacity is configured without the DEFAULT capacity!");
+        }
+        if (_partitionCapacityMap.values().stream()
+            .anyMatch(capacity -> capacity.values().stream().anyMatch(value -> value < 0))) {
+          throw new IllegalArgumentException(
+              "Partition capacity is configured with negative capacity value!");
+        }
+      }
     }
 
     public ResourceConfig build() {
@@ -718,7 +839,7 @@ public class ResourceConfig extends HelixProperty {
           _stateModelFactoryName, _numReplica, _minActiveReplica, _maxPartitionsPerInstance,
           _instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, _groupRoutingEnabled,
           _externalViewDisabled, _rebalanceConfig, _stateTransitionTimeoutConfig, _preferenceLists,
-          _mapFields, _p2pMessageEnabled);
+          _mapFields, _p2pMessageEnabled, _partitionCapacityMap);
     }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
new file mode 100644
index 0000000..209b196
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -0,0 +1,130 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS;
+import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT;
+
+public class TestClusterConfig {
+
+  @Test
+  public void testGetCapacityKeys() {
+    List<String> keys = ImmutableList.of("CPU", "MEMORY", "Random");
+
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord()
+        .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), keys);
+
+    Assert.assertEquals(testConfig.getInstanceCapacityKeys(), keys);
+  }
+
+  @Test
+  public void testGetCapacityKeysEmpty() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    Assert.assertEquals(testConfig.getInstanceCapacityKeys(), Collections.emptyList());
+  }
+
+  @Test
+  public void testSetCapacityKeys() {
+    List<String> keys = ImmutableList.of("CPU", "MEMORY", "Random");
+
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setInstanceCapacityKeys(keys);
+
+    Assert.assertEquals(keys, testConfig.getRecord()
+        .getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetCapacityKeysEmptyList() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setInstanceCapacityKeys(Collections.emptyList());
+  }
+
+  @Test
+  public void testGetRebalancePreference() {
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+    preference.put(EVENNESS, 5);
+    preference.put(LESS_MOVEMENT, 3);
+
+    Map<String, String> mapFieldData = new HashMap<>();
+    for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) {
+      mapFieldData.put(key.name(), String.valueOf(preference.get(key)));
+    }
+
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord()
+        .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), mapFieldData);
+
+    Assert.assertEquals(testConfig.getGlobalRebalancePreference(), preference);
+  }
+
+  @Test
+  public void testGetRebalancePreferenceDefault() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
+        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+    preference.put(EVENNESS, 5);
+    testConfig.setGlobalRebalancePreference(preference);
+
+    Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
+        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+  }
+
+  @Test
+  public void testSetRebalancePreference() {
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+    preference.put(EVENNESS, 5);
+    preference.put(LESS_MOVEMENT, 3);
+
+    Map<String, String> mapFieldData = new HashMap<>();
+    for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) {
+      mapFieldData.put(key.name(), String.valueOf(preference.get(key)));
+    }
+
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalRebalancePreference(preference);
+
+    Assert.assertEquals(testConfig.getRecord()
+            .getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()),
+        mapFieldData);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetRebalancePreferenceInvalidNumber() {
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+    preference.put(EVENNESS, -1);
+    preference.put(LESS_MOVEMENT, 3);
+
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalRebalancePreference(preference);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 38b1c92..f0da05f 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -19,12 +19,14 @@ package org.apache.helix.model;
  * under the License.
  */
 
-import java.util.Map;
-
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Created with IntelliJ IDEA.
@@ -58,4 +60,64 @@ public class TestInstanceConfig {
     Map<String, String> parsedDomain = instanceConfig.getDomainAsMap();
     Assert.assertTrue(parsedDomain.isEmpty());
   }
+
+  @Test
+  public void testGetInstanceCapacityMap() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1",
+        "item2", "2",
+        "item3", "3");
+
+    ZNRecord rec = new ZNRecord("testId");
+    rec.setMapField(InstanceConfig.InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityDataMapString);
+    InstanceConfig testConfig = new InstanceConfig(rec);
+
+    Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(capacityDataMap));
+  }
+
+  @Test
+  public void testGetInstanceCapacityMapEmpty() {
+    InstanceConfig testConfig = new InstanceConfig("testId");
+
+    Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(Collections.emptyMap()));
+  }
+
+  @Test
+  public void testSetInstanceCapacityMap() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1",
+        "item2", "2",
+        "item3", "3");
+
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setInstanceCapacityMap(capacityDataMap);
+
+    Assert.assertEquals(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty.
+        INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty")
+  public void testSetInstanceCapacityMapEmpty() {
+    Map<String, Integer> capacityDataMap = new HashMap<>();
+
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setInstanceCapacityMap(capacityDataMap);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = "Capacity Data contains a negative value: item3 = -3")
+  public void testSetInstanceCapacityMapInvalid() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", -3);
+
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setInstanceCapacityMap(capacityDataMap);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java
new file mode 100644
index 0000000..8099486
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java
@@ -0,0 +1,186 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ZNRecord;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestResourceConfig {
+  private static final ObjectMapper _objectMapper = new ObjectMapper();
+
+  @Test
+  public void testGetPartitionCapacityMap() throws IOException {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    ZNRecord rec = new ZNRecord("testId");
+    rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections
+        .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
+            _objectMapper.writeValueAsString(capacityDataMap)));
+    ResourceConfig testConfig = new ResourceConfig(rec);
+
+    Assert.assertTrue(testConfig.getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY)
+        .equals(capacityDataMap));
+  }
+
+  @Test
+  public void testGetPartitionCapacityMapEmpty() throws IOException {
+    ResourceConfig testConfig = new ResourceConfig("testId");
+
+    Assert.assertTrue(testConfig.getPartitionCapacityMap().equals(Collections.emptyMap()));
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testGetPartitionCapacityMapInvalidJson() throws IOException {
+    ZNRecord rec = new ZNRecord("testId");
+    rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(),
+        Collections.singletonMap("test", "gibberish"));
+    ResourceConfig testConfig = new ResourceConfig(rec);
+
+    testConfig.getPartitionCapacityMap();
+  }
+
+  @Test(dependsOnMethods = "testGetPartitionCapacityMap", expectedExceptions = IOException.class)
+  public void testGetPartitionCapacityMapInvalidJsonType() throws IOException {
+    Map<String, String> capacityDataMap = ImmutableMap.of("item1", "1",
+        "item2", "2",
+        "item3", "three");
+
+    ZNRecord rec = new ZNRecord("testId");
+    rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections
+        .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
+            _objectMapper.writeValueAsString(capacityDataMap)));
+    ResourceConfig testConfig = new ResourceConfig(rec);
+
+    testConfig.getPartitionCapacityMap();
+  }
+
+  @Test
+  public void testSetPartitionCapacityMap() throws IOException {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    ResourceConfig testConfig = new ResourceConfig("testConfig");
+    testConfig.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap));
+
+    Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+            PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY),
+        _objectMapper.writeValueAsString(capacityDataMap));
+  }
+
+  @Test
+  public void testSetMultiplePartitionCapacityMap() throws IOException {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    Map<String, Map<String, Integer>> totalCapacityMap =
+        ImmutableMap.of(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap,
+        "partition2", capacityDataMap,
+        "partition3", capacityDataMap);
+
+    ResourceConfig testConfig = new ResourceConfig("testConfig");
+    testConfig.setPartitionCapacityMap(totalCapacityMap);
+
+    Assert.assertNull(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+        PARTITION_CAPACITY_MAP.name()).get("partition1"));
+    Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+        PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY),
+        _objectMapper.writeValueAsString(capacityDataMap));
+    Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+            PARTITION_CAPACITY_MAP.name()).get("partition2"),
+        _objectMapper.writeValueAsString(capacityDataMap));
+    Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+            PARTITION_CAPACITY_MAP.name()).get("partition3"),
+        _objectMapper.writeValueAsString(capacityDataMap));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty")
+  public void testSetPartitionCapacityMapEmpty() throws IOException {
+    Map<String, Integer> capacityDataMap = new HashMap<>();
+
+    ResourceConfig testConfig = new ResourceConfig("testConfig");
+    testConfig.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.")
+  public void testSetPartitionCapacityMapWithoutDefault() throws IOException {
+    Map<String, Integer> capacityDataMap = new HashMap<>();
+
+    ResourceConfig testConfig = new ResourceConfig("testConfig");
+    testConfig.setPartitionCapacityMap(
+        Collections.singletonMap("Random", capacityDataMap));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data contains a negative value:.+")
+  public void testSetPartitionCapacityMapInvalid() throws IOException {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", -3);
+
+    ResourceConfig testConfig = new ResourceConfig("testConfig");
+    testConfig.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap));
+  }
+
+  @Test
+  public void testWithResourceBuilder() throws IOException {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig");
+    builder.setPartitionCapacity(capacityDataMap);
+    builder.setPartitionCapacity("partition1", capacityDataMap);
+
+    Assert.assertEquals(
+        builder.build().getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY),
+        capacityDataMap);
+    Assert.assertEquals(
+        builder.build().getPartitionCapacityMap().get("partition1"),
+        capacityDataMap);
+    Assert.assertNull(
+        builder.build().getPartitionCapacityMap().get("Random"));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.")
+  public void testWithResourceBuilderInvalidInput() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+        "item2", 2,
+        "item3", 3);
+
+    ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig");
+    builder.setPartitionCapacity("Random", capacityDataMap);
+
+    builder.build();
+  }
+}