You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:23 UTC

[helix] 20/37: HardConstraints Implementation and unit tests (#433)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 43d3476918b476985542cd75f4c76ea2fc60ca74
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Mon Sep 9 13:57:28 2019 -0700

    HardConstraints Implementation and unit tests (#433)
    
    * Implement all of basic Hard Constraints
    1. Partitions count cannot exceed instance's upper limit
    2. Fault zone aware (no same partitions on the same zone)
    3. Partitions weight cannot exceed instance's capacity
    4. Cannot assign inactived partitions
    5. Same partition of different states cannot co-exist in one instance
    6. Instance doesn't have the tag of the replica
---
 .../constraints/FaultZoneAwareConstraint.java      | 43 ++++++++++++
 .../waged/constraints/NodeCapacityConstraint.java  | 50 ++++++++++++++
 .../NodeMaxPartitionLimitConstraint.java           | 40 +++++++++++
 .../constraints/ReplicaActivateConstraint.java     | 41 +++++++++++
 .../SamePartitionOnInstanceConstraint.java         | 39 +++++++++++
 .../waged/constraints/ValidGroupTagConstraint.java | 41 +++++++++++
 .../rebalancer/waged/model/AssignableNode.java     | 75 +++++++++++---------
 .../rebalancer/waged/model/AssignableReplica.java  |  4 ++
 .../constraints/TestFaultZoneAwareConstraint.java  | 79 ++++++++++++++++++++++
 .../constraints/TestNodeCapacityConstraint.java    | 54 +++++++++++++++
 .../TestNodeMaxPartitionLimitConstraint.java       | 56 +++++++++++++++
 .../TestPartitionActivateConstraint.java           | 64 ++++++++++++++++++
 .../TestSamePartitionOnInstanceConstraint.java     | 59 ++++++++++++++++
 .../constraints/TestValidGroupTagConstraint.java   | 66 ++++++++++++++++++
 14 files changed, 679 insertions(+), 32 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java
new file mode 100644
index 0000000..c33419e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java
@@ -0,0 +1,43 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class FaultZoneAwareConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    if (!node.hasFaultZone()) {
+      return true;
+    }
+    return !clusterContext
+        .getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone())
+        .contains(replica.getPartitionName());
+  }
+
+  @Override
+  String getDescription() {
+    return "A fault zone cannot contain more than 1 replica of same partition";
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
new file mode 100644
index 0000000..5fc2faf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
@@ -0,0 +1,50 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class NodeCapacityConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    Map<String, Integer> nodeCapacity = node.getCurrentCapacity();
+    Map<String, Integer> replicaCapacity = replica.getCapacity();
+
+    for (String key : replicaCapacity.keySet()) {
+      if (nodeCapacity.containsKey(key)) {
+        if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  String getDescription() {
+    return "Node has insufficient capacity";
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
new file mode 100644
index 0000000..9d0752b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class NodeMaxPartitionLimitConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    return node.getAssignedReplicaCount() < node.getMaxPartition()
+        && node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
+            .getResourceMaxPartitionsPerInstance();
+  }
+
+  @Override
+  String getDescription() {
+    return "Cannot exceed the maximum number of partitions limitation on node";
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java
new file mode 100644
index 0000000..9152efe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java
@@ -0,0 +1,41 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class ReplicaActivateConstraint extends HardConstraint {
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    List<String> disabledPartitions =
+        node.getDisabledPartitionsMap().get(replica.getResourceName());
+    return disabledPartitions == null || !disabledPartitions.contains(replica.getPartitionName());
+  }
+
+  @Override
+  String getDescription() {
+    return "Cannot assign the inactive replica";
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java
new file mode 100644
index 0000000..202e49a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java
@@ -0,0 +1,39 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class SamePartitionOnInstanceConstraint extends HardConstraint {
+
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    return !node.getAssignedPartitionsByResource(replica.getResourceName())
+        .contains(replica.getPartitionName());
+  }
+
+  @Override
+  String getDescription() {
+    return "Same partition of different states cannot co-exist in one instance";
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java
new file mode 100644
index 0000000..e31864f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java
@@ -0,0 +1,41 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+class ValidGroupTagConstraint extends HardConstraint {
+  @Override
+  boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    if (!replica.hasResourceInstanceGroupTag()) {
+      return true;
+    }
+
+    return node.getInstanceTags().contains(replica.getResourceInstanceGroupTag());
+  }
+
+  @Override
+  String getDescription() {
+    return "Instance doesn't have the tag of the replica";
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 4141d20..f25c289 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import static java.lang.Math.max;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,8 +37,6 @@ import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.lang.Math.max;
-
 /**
  * This class represents a possible allocation of the replication.
  * Note that any usage updates to the AssignableNode are not thread safe.
@@ -52,7 +52,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private Map<String, Integer> _maxCapacity;
   private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
 
-  // A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the node.
+  // A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the
+  // node.
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
   private Map<String, Integer> _currentCapacityMap;
@@ -78,13 +79,15 @@ public class AssignableNode implements Comparable<AssignableNode> {
   }
 
   /**
-   * Update the node with a ClusterDataCache. This resets the current assignment and recalculates currentCapacity.
-   * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be
-   * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could
+   * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
+   * currentCapacity.
+   * NOTE: While this is required to be used in the constructor, this can also be used when the
+   * clusterCache needs to be
+   * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and
+   * ResourceConfig could
    * subject to change. If the assumption is no longer true, this function should become private.
-   *
-   * @param clusterConfig      - the Cluster Config of the cluster where the node is located
-   * @param instanceConfig     - the Instance Config of the node
+   * @param clusterConfig - the Cluster Config of the cluster where the node is located
+   * @param instanceConfig - the Instance Config of the node
    * @param existingAssignment - all the existing replicas that are current assigned to the node
    */
   private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
@@ -104,7 +107,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
 
   /**
    * Assign a replica to the node.
-   *
    * @param assignableReplica - the replica to be assigned
    */
   void assign(AssignableReplica assignableReplica) {
@@ -116,7 +118,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
   /**
    * Release a replica from the node.
    * If the replication is not on this node, the assignable node is not updated.
-   *
    * @param replica - the replica to be released
    */
   void release(AssignableReplica replica) throws IllegalArgumentException {
@@ -131,8 +132,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     Map<String, AssignableReplica> partitionMap = _currentAssignedReplicaMap.get(resourceName);
-    if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName)
-        .equals(replica)) {
+    if (!partitionMap.containsKey(partitionName)
+        || !partitionMap.get(partitionName).equals(replica)) {
       LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.",
           replica.toString(), getInstanceName());
       return;
@@ -174,7 +175,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
 
   /**
    * @param resource Resource name
-   * @return A set of the current assigned replicas' partition names with the top state in the specified resource.
+   * @return A set of the current assigned replicas' partition names with the top state in the
+   *         specified resource.
    */
   public Set<String> getAssignedTopStatePartitionsByResource(String resource) {
     return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).entrySet()
@@ -194,7 +196,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   /**
    * @return The total count of assigned replicas.
    */
-  public long getAssignedReplicaCount() {
+  public int getAssignedReplicaCount() {
     return _currentAssignedReplicaMap.values().stream().mapToInt(Map::size).sum();
   }
 
@@ -207,7 +209,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
 
   /**
    * Return the most concerning capacity utilization number for evenly partition assignment.
-   * The method dynamically returns the highest utilization number among all the capacity categories.
+   * The method dynamically returns the highest utilization number among all the capacity
+   * categories.
    * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
    * return 0.9.
    *
@@ -229,15 +232,21 @@ public class AssignableNode implements Comparable<AssignableNode> {
     return _faultZone;
   }
 
+  public boolean hasFaultZone() {
+    return _faultZone != null;
+  }
+
   /**
-   * @return A map of <resource name, set of partition names> contains all the partitions that are disabled on the node.
+   * @return A map of <resource name, set of partition names> contains all the partitions that are
+   *         disabled on the node.
    */
   public Map<String, List<String>> getDisabledPartitionsMap() {
     return _disabledPartitionsMap;
   }
 
   /**
-   * @return A map of <capacity category, capacity number> that describes the max capacity of the node.
+   * @return A map of <capacity category, capacity number> that describes the max capacity of the
+   *         node.
    */
   public Map<String, Integer> getMaxCapacity() {
     return _maxCapacity;
@@ -251,8 +260,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
   }
 
   /**
-   * Computes the fault zone id based on the domain and fault zone type when topology is enabled. For example, when
-   * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function returns "2".
+   * Computes the fault zone id based on the domain and fault zone type when topology is enabled.
+   * For example, when
+   * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function
+   * returns "2".
    * If cannot find the fault zone id, this function leaves the fault zone id as the instance name.
    * TODO merge this logic with Topology.java tree building logic.
    * For now, the WAGED rebalancer has a more strict topology def requirement.
@@ -267,8 +278,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
       }
 
       String[] topologyDef = topologyStr.trim().split("/");
-      if (topologyDef.length == 0 || Arrays.stream(topologyDef)
-          .noneMatch(type -> type.equals(faultZoneType))) {
+      if (topologyDef.length == 0
+          || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
         throw new HelixException(
             "The configured topology definition is empty or does not contain the fault zone type.");
       }
@@ -304,7 +315,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
   }
 
   /**
-   * This function should only be used to assign a set of new partitions that are not allocated on this node.
+   * This function should only be used to assign a set of new partitions that are not allocated on
+   * this node.
    * Using this function avoids the overhead of updating capacity repeatedly.
    */
   private void assignNewBatch(Collection<AssignableReplica> replicas) {
@@ -314,9 +326,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
       // increment the capacity requirement according to partition's capacity configuration.
       for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
         totalPartitionCapacity.compute(capacity.getKey(),
-            (key, totalValue) -> (totalValue == null) ?
-                capacity.getValue() :
-                totalValue + capacity.getValue());
+            (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                : totalValue + capacity.getValue());
       }
     }
 
@@ -332,12 +343,12 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private void addToAssignmentRecord(AssignableReplica replica) {
     String resourceName = replica.getResourceName();
     String partitionName = replica.getPartitionName();
-    if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
-        .get(resourceName).containsKey(partitionName)) {
-      throw new HelixException(String
-          .format("Resource %s already has a replica with state %s from partition %s on node %s",
-              replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
-              getInstanceName()));
+    if (_currentAssignedReplicaMap.containsKey(resourceName)
+        && _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
+      throw new HelixException(String.format(
+          "Resource %s already has a replica with state %s from partition %s on node %s",
+          replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
+          getInstanceName()));
     } else {
       _currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
           .put(partitionName, replica);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 537bf70..66bd7b7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -87,6 +87,10 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
     return _resourceInstanceGroupTag;
   }
 
+  public boolean hasResourceInstanceGroupTag() {
+    return _resourceInstanceGroupTag != null && !_resourceInstanceGroupTag.isEmpty();
+  }
+
   public int getResourceMaxPartitionsPerInstance() {
     return _resourceMaxPartitionsPerInstance;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java
new file mode 100644
index 0000000..9d2cb14
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestFaultZoneAwareConstraint {
+  private static final String TEST_PARTITION = "testPartition";
+  private static final String TEST_ZONE = "testZone";
+  private static final String TEST_RESOURCE = "testResource";
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+
+  private final HardConstraint _faultZoneAwareConstraint = new FaultZoneAwareConstraint();
+
+  @BeforeMethod
+  public void init() {
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(_testNode.getFaultZone()).thenReturn(TEST_ZONE);
+  }
+
+  @Test
+  public void inValidWhenFaultZoneAlreadyAssigned() {
+    when(_testNode.hasFaultZone()).thenReturn(true);
+    when(_clusterContext.getPartitionsForResourceAndFaultZone(TEST_RESOURCE, TEST_ZONE)).thenReturn(
+            ImmutableSet.of(TEST_PARTITION));
+
+    Assert.assertFalse(
+        _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void validWhenEmptyAssignment() {
+    when(_testNode.hasFaultZone()).thenReturn(true);
+    when(_clusterContext.getPartitionsForResourceAndFaultZone(TEST_RESOURCE, TEST_ZONE)).thenReturn(Collections.emptySet());
+
+    Assert.assertTrue(
+        _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void validWhenNoFaultZone() {
+    when(_testNode.hasFaultZone()).thenReturn(false);
+
+    Assert.assertTrue(
+        _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
new file mode 100644
index 0000000..511f881
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
@@ -0,0 +1,54 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestNodeCapacityConstraint {
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new NodeCapacityConstraint();
+
+  @Test
+  public void testConstraintValidWhenNodeHasEnoughSpace() {
+    String key = "testKey";
+    when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key,  10));
+    when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void testConstraintInValidWhenNodeHasInsufficientSpace() {
+    String key = "testKey";
+    when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key,  1));
+    when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java
new file mode 100644
index 0000000..4cb7466
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java
@@ -0,0 +1,56 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestNodeMaxPartitionLimitConstraint {
+  private static final String TEST_RESOURCE = "TestResource";
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new NodeMaxPartitionLimitConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(0);
+    when(_testNode.getMaxPartition()).thenReturn(10);
+    when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE))
+        .thenReturn(Collections.emptySet());
+    when(_testReplica.getResourceMaxPartitionsPerInstance()).thenReturn(5);
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void testConstraintInvalid() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(10);
+    when(_testNode.getMaxPartition()).thenReturn(5);
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java
new file mode 100644
index 0000000..ecfdaa2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java
@@ -0,0 +1,64 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestPartitionActivateConstraint {
+  private static final String TEST_PARTITION = "TestPartition";
+  private static final String TEST_RESOURCE = "TestResource";
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new ReplicaActivateConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(_testNode.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of(TEST_PARTITION, Collections.emptyList()));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+    when(_testNode.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of(TEST_PARTITION, ImmutableList.of("dummy")));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void testConstraintInvalidWhenReplicaIsDisabled() {
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(_testNode.getDisabledPartitionsMap())
+        .thenReturn(ImmutableMap.of(TEST_PARTITION, ImmutableList.of(TEST_PARTITION)));
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java
new file mode 100644
index 0000000..50b0c03
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java
@@ -0,0 +1,59 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestSamePartitionOnInstanceConstraint {
+  private static final String TEST_RESOURCE = "TestResource";
+  private static final String TEST_PARTITIOIN = TEST_RESOURCE + "0";
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new SamePartitionOnInstanceConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE))
+        .thenReturn(ImmutableSet.of("dummy"));
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITIOIN);
+
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void testConstraintInValid() {
+    when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE))
+        .thenReturn(ImmutableSet.of(TEST_PARTITIOIN));
+    when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
+    when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITIOIN);
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java
new file mode 100644
index 0000000..8d02b3d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java
@@ -0,0 +1,66 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class TestValidGroupTagConstraint {
+  private static final String TEST_TAG = "testTag";
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+  private final HardConstraint _constraint = new ValidGroupTagConstraint();
+
+  @Test
+  public void testConstraintValid() {
+    when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(true);
+    when(_testReplica.getResourceInstanceGroupTag()).thenReturn(TEST_TAG);
+    when(_testNode.getInstanceTags()).thenReturn(ImmutableSet.of(TEST_TAG));
+
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void testConstraintInValid() {
+    when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(true);
+    when(_testReplica.getResourceInstanceGroupTag()).thenReturn(TEST_TAG);
+    when(_testNode.getInstanceTags()).thenReturn(Collections.emptySet());
+
+    Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+
+  @Test
+  public void testConstraintWhenReplicaHasNoTag() {
+    when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(false);
+
+    Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
+  }
+}