You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/13 19:17:58 UTC

[helix] branch wagedRebalancer updated: Add Java API for adding and validating resources for WAGED rebalancer (#570)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 043f445  Add Java API for adding and validating resources for WAGED rebalancer (#570)
043f445 is described below

commit 043f445d336317fd48a02eb42d6bc2059726c79a
Author: Hunter Lee <na...@gmail.com>
AuthorDate: Wed Nov 13 11:17:49 2019 -0800

    Add Java API for adding and validating resources for WAGED rebalancer (#570)
    
    Add Java API methods for adding and validating resources for WAGED rebalancer. This is a set of convenience APIs provided through HelixAdmin the user could use to more easily add resources and validate them for WAGED rebalance usage.
    Changelist:
    1. Add API methods in HelixAdmin
    2. Implement the said methods
    3. Add tests
---
 .../src/main/java/org/apache/helix/HelixAdmin.java |  49 ++++
 .../rebalancer/util/WagedValidationUtil.java       |  91 +++++++
 .../rebalancer/waged/model/AssignableNode.java     |  36 +--
 .../rebalancer/waged/model/AssignableReplica.java  |  24 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 282 ++++++++++++++++++---
 .../rebalancer/waged/model/TestAssignableNode.java |   2 +-
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  | 109 ++++++++
 .../java/org/apache/helix/mock/MockHelixAdmin.java |  22 ++
 8 files changed, 538 insertions(+), 77 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 7402c19..1b0544d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -22,6 +22,7 @@ package org.apache.helix;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
@@ -30,8 +31,10 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 
+
 /*
  * Helix cluster management
  */
@@ -576,4 +579,50 @@ public interface HelixAdmin {
    * Release resources
    */
   void close();
+
+  /**
+   * Adds a resource with IdealState and ResourceConfig to be rebalanced by WAGED rebalancer with validation.
+   * Validation includes the following:
+   * 1. Check ResourceConfig has the WEIGHT field
+   * 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
+   * 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
+   * @param clusterName
+   * @param idealState
+   * @param resourceConfig
+   * @return true if the resource has been added successfully. False otherwise
+   */
+  boolean addResourceWithWeight(String clusterName, IdealState idealState,
+      ResourceConfig resourceConfig);
+
+  /**
+   * Batch-enables Waged rebalance for the names of resources given.
+   * @param clusterName
+   * @param resourceNames
+   * @return
+   */
+  boolean enableWagedRebalance(String clusterName, List<String> resourceNames);
+
+  /**
+   * Validates the resources to see if their weight configs have been set properly.
+   * Validation includes the following:
+   * 1. Check ResourceConfig has the WEIGHT field
+   * 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
+   * 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
+   * @param resourceNames
+   * @return for each resource, true if the weight configs have been set properly, false otherwise
+   */
+  Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
+      List<String> resourceNames);
+
+  /**
+   * Validates the instances to ensure their weights in InstanceConfigs have been set up properly.
+   * Validation includes the following:
+   * 1. If default instance capacity is not set, check that the InstanceConfigs have the CAPACITY field
+   * 2. Check that all capacity keys defined in ClusterConfig are present in the CAPACITY field
+   * @param clusterName
+   * @param instancesNames
+   * @return
+   */
+  Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
+      List<String> instancesNames);
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
new file mode 100644
index 0000000..e9f86e7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+
+
+/**
+ * A util class that contains validation-related static methods for WAGED rebalancer.
+ */
+public class WagedValidationUtil {
+  /**
+   * Validates and returns instance capacities. The validation logic ensures that all required capacity keys (in ClusterConfig) are present in InstanceConfig.
+   * @param clusterConfig
+   * @param instanceConfig
+   * @return
+   */
+  public static Map<String, Integer> validateAndGetInstanceCapacity(ClusterConfig clusterConfig,
+      InstanceConfig instanceConfig) {
+    // Fetch the capacity of instance from 2 possible sources according to the following priority.
+    // 1. The instance capacity that is configured in the instance config.
+    // 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
+    Map<String, Integer> instanceCapacity =
+        new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+    instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+
+    List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+    // All the required keys must exist in the instance config.
+    if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
+      throw new HelixException(String.format(
+          "The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
+          requiredCapacityKeys.toString(), instanceConfig.getInstanceName(),
+          instanceCapacity.toString()));
+    }
+    return instanceCapacity;
+  }
+
+  /**
+   * Validates and returns partition capacities. The validation logic ensures that all required capacity keys (from ClusterConfig) are present in the ResourceConfig for the partition.
+   * @param partitionName
+   * @param resourceConfig
+   * @param clusterConfig
+   * @return
+   */
+  public static Map<String, Integer> validateAndGetPartitionCapacity(String partitionName,
+      ResourceConfig resourceConfig, Map<String, Map<String, Integer>> capacityMap,
+      ClusterConfig clusterConfig) {
+    // Fetch the capacity of partition from 3 possible sources according to the following priority.
+    // 1. The partition capacity that is explicitly configured in the resource config.
+    // 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
+    // 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
+    Map<String, Integer> partitionCapacity =
+        new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
+    partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
+        capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));
+
+    List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+    // If any required capacity key is not configured in the resource config, fail the model creating.
+    if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
+      throw new HelixException(String.format(
+          "The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
+          requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
+          partitionCapacity.toString()));
+    }
+    return partitionCapacity;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 6beda6a..09a3cba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -31,11 +31,13 @@ import java.util.stream.Collectors;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This class represents a possible allocation of the replication.
  * Note that any usage updates to the AssignableNode are not thread safe.
@@ -113,7 +115,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   void assign(AssignableReplica assignableReplica) {
     addToAssignmentRecord(assignableReplica);
     assignableReplica.getCapacity().entrySet().stream()
-            .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
+        .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
   }
 
   /**
@@ -121,7 +123,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * If the replication is not on this node, the assignable node is not updated.
    * @param replica - the replica to be released
    */
-  void release(AssignableReplica replica) throws IllegalArgumentException {
+  void release(AssignableReplica replica)
+      throws IllegalArgumentException {
     String resourceName = replica.getResourceName();
     String partitionName = replica.getPartitionName();
 
@@ -320,12 +323,12 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private void addToAssignmentRecord(AssignableReplica replica) {
     String resourceName = replica.getResourceName();
     String partitionName = replica.getPartitionName();
-    if (_currentAssignedReplicaMap.containsKey(resourceName)
-        && _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
-      throw new HelixException(String.format(
-          "Resource %s already has a replica with state %s from partition %s on node %s",
-          replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
-          getInstanceName()));
+    if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
+        .get(resourceName).containsKey(partitionName)) {
+      throw new HelixException(String
+          .format("Resource %s already has a replica with state %s from partition %s on node %s",
+              replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
+              getInstanceName()));
     } else {
       _currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
           .put(partitionName, replica);
@@ -348,23 +351,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
    */
   private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
       InstanceConfig instanceConfig) {
-    // Fetch the capacity of instance from 2 possible sources according to the following priority.
-    // 1. The instance capacity that is configured in the instance config.
-    // 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
     Map<String, Integer> instanceCapacity =
-        new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
-    instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());
-
-    List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
-    // All the required keys must exist in the instance config.
-    if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
-      throw new HelixException(String.format(
-          "The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
-          requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
-    }
+        WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
     // Remove all the non-required capacity items from the map.
-    instanceCapacity.keySet().retainAll(requiredCapacityKeys);
-
+    instanceCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
     return instanceCapacity;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index b679a54..12b5105 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -149,27 +150,10 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
           "Invalid partition capacity configuration of resource: " + resourceConfig
               .getResourceName(), ex);
     }
-
-    // Fetch the capacity of partition from 3 possible sources according to the following priority.
-    // 1. The partition capacity that is explicitly configured in the resource config.
-    // 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
-    // 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
-    Map<String, Integer> partitionCapacity =
-        new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
-    partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
-        capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));
-
-    List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
-    // If any required capacity key is not configured in the resource config, fail the model creating.
-    if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
-      throw new HelixException(String.format(
-          "The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
-          requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
-          partitionCapacity.toString()));
-    }
+    Map<String, Integer> partitionCapacity = WagedValidationUtil
+        .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
     // Remove the non-required capacity items.
-    partitionCapacity.keySet().retainAll(requiredCapacityKeys);
-
+    partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
     return partitionCapacity;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 12ab636..bbdb3f7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
@@ -56,6 +57,10 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.ClusterConfig;
@@ -75,6 +80,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
@@ -83,6 +89,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ZKHelixAdmin implements HelixAdmin {
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
   private static final String MAINTENANCE_ZNODE_ID = "maintenance";
@@ -173,7 +180,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           // does not repeatedly write instance history)
           logger.warn("Retrying dropping instance {} with exception {}",
               instanceConfig.getInstanceName(), e.getCause().getMessage());
-          retryCnt ++;
+          retryCnt++;
         } else {
           logger.error("Failed to drop instance {} (not retryable).",
               instanceConfig.getInstanceName(), e.getCause());
@@ -396,7 +403,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
-    return accessor.getBaseDataAccessor().exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
+    return accessor.getBaseDataAccessor()
+        .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
   }
 
   @Override
@@ -429,16 +437,16 @@ public class ZKHelixAdmin implements HelixAdmin {
    * @param customFields
    * @param triggeringEntity
    */
-  private void processMaintenanceMode(String clusterName, final boolean enabled, final String reason,
-      final MaintenanceSignal.AutoTriggerReason internalReason, final Map<String, String> customFields,
+  private void processMaintenanceMode(String clusterName, final boolean enabled,
+      final String reason, final MaintenanceSignal.AutoTriggerReason internalReason,
+      final Map<String, String> customFields,
       final MaintenanceSignal.TriggeringEntity triggeringEntity) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
         triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically"
-            : "manually",
-        enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
+            : "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
     final long currentTime = System.currentTimeMillis();
     if (!enabled) {
       // Exit maintenance mode
@@ -452,23 +460,23 @@ public class ZKHelixAdmin implements HelixAdmin {
       maintenanceSignal.setTimestamp(currentTime);
       maintenanceSignal.setTriggeringEntity(triggeringEntity);
       switch (triggeringEntity) {
-      case CONTROLLER:
-        // autoEnable
-        maintenanceSignal.setAutoTriggerReason(internalReason);
-        break;
-      case USER:
-      case UNKNOWN:
-        // manuallyEnable
-        if (customFields != null && !customFields.isEmpty()) {
-          // Enter all custom fields provided by the user
-          Map<String, String> simpleFields = maintenanceSignal.getRecord().getSimpleFields();
-          for (Map.Entry<String, String> entry : customFields.entrySet()) {
-            if (!simpleFields.containsKey(entry.getKey())) {
-              simpleFields.put(entry.getKey(), entry.getValue());
+        case CONTROLLER:
+          // autoEnable
+          maintenanceSignal.setAutoTriggerReason(internalReason);
+          break;
+        case USER:
+        case UNKNOWN:
+          // manuallyEnable
+          if (customFields != null && !customFields.isEmpty()) {
+            // Enter all custom fields provided by the user
+            Map<String, String> simpleFields = maintenanceSignal.getRecord().getSimpleFields();
+            for (Map.Entry<String, String> entry : customFields.entrySet()) {
+              if (!simpleFields.containsKey(entry.getKey())) {
+                simpleFields.put(entry.getKey(), entry.getValue());
+              }
             }
           }
-        }
-        break;
+          break;
       }
       if (!accessor.createMaintenance(maintenanceSignal)) {
         throw new HelixException("Failed to create maintenance signal!");
@@ -476,16 +484,17 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     // Record a MaintenanceSignal history
-    if (!accessor.getBaseDataAccessor().update(keyBuilder.controllerLeaderHistory().getPath(),
-        new DataUpdater<ZNRecord>() {
+    if (!accessor.getBaseDataAccessor()
+        .update(keyBuilder.controllerLeaderHistory().getPath(), new DataUpdater<ZNRecord>() {
           @Override
           public ZNRecord update(ZNRecord oldRecord) {
             try {
               if (oldRecord == null) {
                 oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
               }
-              return new ControllerHistory(oldRecord).updateMaintenanceHistory(enabled, reason,
-                  currentTime, internalReason, customFields, triggeringEntity);
+              return new ControllerHistory(oldRecord)
+                  .updateMaintenanceHistory(enabled, reason, currentTime, internalReason,
+                      customFields, triggeringEntity);
             } catch (IOException e) {
               logger.error("Failed to update maintenance history! Exception: {}", e);
               return oldRecord;
@@ -1234,7 +1243,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
   }
 
-  private static byte[] readFile(String filePath) throws IOException {
+  private static byte[] readFile(String filePath)
+      throws IOException {
     File file = new File(filePath);
 
     int size = (int) file.length();
@@ -1257,7 +1267,8 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void addStateModelDef(String clusterName, String stateModelDefName,
-      String stateModelDefFile) throws IOException {
+      String stateModelDefFile)
+      throws IOException {
     ZNRecord record =
         (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
     if (record == null || record.getId() == null || !record.getId().equals(stateModelDefName)) {
@@ -1280,9 +1291,9 @@ public class ZKHelixAdmin implements HelixAdmin {
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
-        ClusterConstraints constraints = currentData == null ?
-            new ClusterConstraints(constraintType) :
-            new ClusterConstraints(currentData);
+        ClusterConstraints constraints =
+            currentData == null ? new ClusterConstraints(constraintType)
+                : new ClusterConstraints(currentData);
 
         constraints.addConstraintItem(constraintId, constraintItem);
         return constraints.getRecord();
@@ -1488,9 +1499,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           + ", instance config does not exist");
     }
 
-    baseAccessor.update(path, new DataUpdater<ZNRecord>()
-
-    {
+    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
         if (currentData == null) {
@@ -1578,4 +1587,211 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
   }
 
+  @Override
+  public boolean addResourceWithWeight(String clusterName, IdealState idealState,
+      ResourceConfig resourceConfig) {
+    // Null checks
+    if (clusterName == null || clusterName.isEmpty()) {
+      throw new HelixException("Cluster name is null or empty!");
+    }
+    if (idealState == null || !idealState.isValid()) {
+      throw new HelixException("IdealState is null or invalid!");
+    }
+    if (resourceConfig == null || !resourceConfig.isValid()) {
+      // TODO This might be okay because of default weight?
+      throw new HelixException("ResourceConfig is null or invalid!");
+    }
+
+    // Make sure IdealState and ResourceConfig are for the same resource
+    if (!idealState.getResourceName().equals(resourceConfig.getResourceName())) {
+      throw new HelixException("Resource names in IdealState and ResourceConfig are different!");
+    }
+
+    // Order in which a resource should be added:
+    // 1. Validate the weights in ResourceConfig against ClusterConfig
+    // Check that all capacity keys in ClusterConfig are set up in every partition in ResourceConfig field
+    if (!validateWeightForResourceConfig(_configAccessor.getClusterConfig(clusterName),
+        resourceConfig, idealState)) {
+      throw new HelixException(String
+          .format("Could not add resource %s with weight! Failed to validate the ResourceConfig!",
+              idealState.getResourceName()));
+    }
+
+    // 2. Add the resourceConfig to ZK
+    _configAccessor
+        .setResourceConfig(clusterName, resourceConfig.getResourceName(), resourceConfig);
+
+    // 3. Add the idealState to ZK
+    setResourceIdealState(clusterName, idealState.getResourceName(), idealState);
+
+    // 4. rebalance the resource
+    rebalance(clusterName, idealState.getResourceName(), Integer.parseInt(idealState.getReplicas()),
+        idealState.getResourceName(), idealState.getInstanceGroupTag());
+
+    return true;
+  }
+
+  @Override
+  public boolean enableWagedRebalance(String clusterName, List<String> resourceNames) {
+    // Null checks
+    if (clusterName == null || clusterName.isEmpty()) {
+      throw new HelixException("Cluster name is invalid!");
+    }
+    if (resourceNames == null || resourceNames.isEmpty()) {
+      throw new HelixException("Resource name list is invalid!");
+    }
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates());
+    List<String> nullIdealStates = new ArrayList<>();
+    for (int i = 0; i < idealStates.size(); i++) {
+      if (idealStates.get(i) == null) {
+        nullIdealStates.add(resourceNames.get(i));
+      } else {
+        idealStates.get(i).setRebalancerClassName(WagedRebalancer.class.getName());
+        idealStates.get(i).setRebalanceMode(RebalanceMode.FULL_AUTO);
+      }
+    }
+    if (!nullIdealStates.isEmpty()) {
+      throw new HelixException(
+          String.format("Not all IdealStates exist in the cluster: %s", nullIdealStates));
+    }
+    List<PropertyKey> idealStateKeys = new ArrayList<>();
+    idealStates.forEach(
+        idealState -> idealStateKeys.add(keyBuilder.idealStates(idealState.getResourceName())));
+    boolean[] success = accessor.setChildren(idealStateKeys, idealStates);
+    for (boolean s : success) {
+      if (!s) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
+      List<String> resourceNames) {
+    // Null checks
+    if (clusterName == null || clusterName.isEmpty()) {
+      throw new HelixException("Cluster name is invalid!");
+    }
+    if (resourceNames == null || resourceNames.isEmpty()) {
+      throw new HelixException("Resource name list is invalid!");
+    }
+
+    // Ensure that all instances are valid
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+    if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) {
+      throw new HelixException(String
+          .format("Instance capacities haven't been configured properly for cluster %s",
+              clusterName));
+    }
+
+    Map<String, Boolean> result = new HashMap<>();
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+    for (String resourceName : resourceNames) {
+      IdealState idealState = getResourceIdealState(clusterName, resourceName);
+      if (idealState == null || !idealState.isValid()) {
+        result.put(resourceName, false);
+        continue;
+      }
+      ResourceConfig resourceConfig = _configAccessor.getResourceConfig(clusterName, resourceName);
+      result.put(resourceName,
+          validateWeightForResourceConfig(clusterConfig, resourceConfig, idealState));
+    }
+    return result;
+  }
+
+  @Override
+  public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
+      List<String> instanceNames) {
+    // Null checks
+    if (clusterName == null || clusterName.isEmpty()) {
+      throw new HelixException("Cluster name is invalid!");
+    }
+    if (instanceNames == null || instanceNames.isEmpty()) {
+      throw new HelixException("Instance name list is invalid!");
+    }
+
+    Map<String, Boolean> result = new HashMap<>();
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+    for (String instanceName : instanceNames) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig == null || !instanceConfig.isValid()) {
+        result.put(instanceName, false);
+        continue;
+      }
+      WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
+      result.put(instanceName, true);
+    }
+
+    return result;
+  }
+
+  /**
+   * Validates ResourceConfig's weight field against the given ClusterConfig.
+   * @param clusterConfig
+   * @param resourceConfig
+   * @param idealState
+   * @return true if ResourceConfig has all the required fields. False otherwise.
+   */
+  private boolean validateWeightForResourceConfig(ClusterConfig clusterConfig,
+      ResourceConfig resourceConfig, IdealState idealState) {
+    if (resourceConfig == null) {
+      if (clusterConfig.getDefaultPartitionWeightMap().isEmpty()) {
+        logger.error(
+            "ResourceConfig for {} is null, and there are no default weights set in ClusterConfig!",
+            idealState.getResourceName());
+        return false;
+      }
+      // If ResourceConfig is null AND the default partition weight map is defined, and the map has all the required keys, we consider this valid since the default weights will be used
+      // Need to check the map contains all the required keys
+      if (clusterConfig.getDefaultPartitionWeightMap().keySet()
+          .containsAll(clusterConfig.getInstanceCapacityKeys())) {
+        // Contains all the required keys, so consider it valid since it will use the default weights
+        return true;
+      }
+      logger.error(
+          "ResourceConfig for {} is null, and ClusterConfig's default partition weight map doesn't have all the required keys!",
+          idealState.getResourceName());
+      return false;
+    }
+
+    // Parse the entire capacityMap from ResourceConfig
+    Map<String, Map<String, Integer>> capacityMap;
+    try {
+      capacityMap = resourceConfig.getPartitionCapacityMap();
+    } catch (IOException ex) {
+      logger.error("Invalid partition capacity configuration of resource: {}",
+          idealState.getResourceName(), ex);
+      return false;
+    }
+
+    Set<String> capacityMapSet = new HashSet<>(capacityMap.keySet());
+    boolean hasDefaultCapacity = capacityMapSet.contains(ResourceConfig.DEFAULT_PARTITION_KEY);
+    // Remove DEFAULT key
+    capacityMapSet.remove(ResourceConfig.DEFAULT_PARTITION_KEY);
+
+    // Make sure capacityMap contains all partitions defined in IdealState
+    // Here, IdealState has not been rebalanced, so listFields might be null, in which case, we would get an emptyList from getPartitionSet()
+    // So check using numPartitions instead
+    // This check allows us to fail early on instead of having to loop through all partitions
+    if (capacityMapSet.size() != idealState.getNumPartitions() && !hasDefaultCapacity) {
+      logger.error(
+          "ResourceConfig for {} does not have all partitions defined in PartitionCapacityMap!",
+          idealState.getResourceName());
+      return false;
+    }
+
+    // Loop through all partitions and validate
+    capacityMap.keySet().forEach(partitionName -> WagedValidationUtil
+        .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap,
+            clusterConfig));
+    return true;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 187c423..2b93353 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -272,7 +272,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     requiredCapacityKeys.add("AdditionalCapacityKey");
     testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);
 
-    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    InstanceConfig testInstanceConfig = new InstanceConfig(_testInstanceId);
     testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
 
     new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index d372d67..c391085 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -19,12 +19,16 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -38,7 +42,10 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -48,18 +55,22 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestZkHelixAdmin extends ZkUnitTestBase {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   @BeforeClass
   public void beforeClass() {
@@ -505,4 +516,102 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
         .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
         2);
   }
+
+  /**
+   * Test addResourceWithWeight() and validateResourcesForWagedRebalance() by trying to add a resource with incomplete ResourceConfig.
+   */
+  @Test
+  public void testAddResourceWithWeightAndValidation()
+      throws IOException {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    String mockInstance = "MockInstance";
+    String testResourcePrefix = "TestResource";
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD());
+
+    // Create a dummy instance
+    InstanceConfig instanceConfig = new InstanceConfig(mockInstance);
+    Map<String, Integer> mockInstanceCapacity =
+        ImmutableMap.of("WCU", 100, "RCU", 100, "STORAGE", 100);
+    instanceConfig.setInstanceCapacityMap(mockInstanceCapacity);
+    admin.addInstance(clusterName, instanceConfig);
+    MockParticipantManager mockParticipantManager =
+        new MockParticipantManager(ZK_ADDR, clusterName, mockInstance);
+    mockParticipantManager.syncStart();
+
+    IdealState idealState = new IdealState(testResourcePrefix);
+    idealState.setNumPartitions(3);
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+
+    ResourceConfig resourceConfig = new ResourceConfig(testResourcePrefix);
+    // validate
+    Map<String, Boolean> validationResult = admin.validateResourcesForWagedRebalance(clusterName,
+        Collections.singletonList(testResourcePrefix));
+    Assert.assertEquals(validationResult.size(), 1);
+    Assert.assertFalse(validationResult.get(testResourcePrefix));
+    try {
+      admin.addResourceWithWeight(clusterName, idealState, resourceConfig);
+      Assert.fail();
+    } catch (HelixException e) {
+      // OK since resourceConfig is empty
+    }
+
+    // Set PARTITION_CAPACITY_MAP
+    Map<String, String> capacityDataMap =
+        ImmutableMap.of("WCU", "1", "RCU", "2", "STORAGE", "3");
+    resourceConfig.getRecord()
+        .setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(),
+            Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
+                OBJECT_MAPPER.writeValueAsString(capacityDataMap)));
+
+    // validate
+    validationResult = admin.validateResourcesForWagedRebalance(clusterName,
+        Collections.singletonList(testResourcePrefix));
+    Assert.assertEquals(validationResult.size(), 1);
+    Assert.assertFalse(validationResult.get(testResourcePrefix));
+
+    // Add the capacity key to ClusterConfig
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+    ClusterConfig clusterConfig = dataAccessor.getProperty(keyBuilder.clusterConfig());
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("WCU", "RCU", "STORAGE"));
+    dataAccessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+
+    // Should succeed now
+    Assert.assertTrue(admin.addResourceWithWeight(clusterName, idealState, resourceConfig));
+    // validate
+    validationResult = admin.validateResourcesForWagedRebalance(clusterName,
+        Collections.singletonList(testResourcePrefix));
+    Assert.assertEquals(validationResult.size(), 1);
+    Assert.assertTrue(validationResult.get(testResourcePrefix));
+  }
+
+  /**
+   * Test enabledWagedRebalance by checking the rebalancer class name changed.
+   */
+  @Test
+  public void testEnableWagedRebalance() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    String testResourcePrefix = "TestResource";
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD());
+
+    // Add an IdealState
+    IdealState idealState = new IdealState(testResourcePrefix);
+    idealState.setNumPartitions(3);
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    admin.addResource(clusterName, testResourcePrefix, idealState);
+
+    admin.enableWagedRebalance(clusterName, Collections.singletonList(testResourcePrefix));
+    IdealState is = admin.getResourceIdealState(clusterName, testResourcePrefix);
+    Assert.assertEquals(is.getRebalancerClassName(), WagedRebalancer.class.getName());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 6cb7790..4319bfb 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -38,6 +38,7 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 
 public class MockHelixAdmin implements HelixAdmin {
@@ -427,4 +428,25 @@ public class MockHelixAdmin implements HelixAdmin {
   @Override public void close() {
 
   }
+
+  @Override
+  public boolean addResourceWithWeight(String clusterName, IdealState idealState, ResourceConfig resourceConfig) {
+    return false;
+  }
+
+  @Override
+  public boolean enableWagedRebalance(String clusterName, List<String> resourceNames) {
+    return false;
+  }
+
+  @Override
+  public Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName, List<String> resourceNames) {
+    return null;
+  }
+
+  @Override
+  public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
+      List<String> instancesNames) {
+    return null;
+  }
 }