You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 22:21:47 UTC

helix git commit: Separate CardDealingAdjustmentAlgorithm for constraint based rebalance strategy.

Repository: helix
Updated Branches:
  refs/heads/master 4c7661017 -> 21a0922ab


Separate CardDealingAdjustmentAlgorithm for constraint based rebalance strategy.

For keeping CRUSHed stable, we decide to split the logic changes made for constraint based rebalance strategy.
After the change, V2 CardDealingAdjustmentAlgorithm will be used for future strategy.
Please keep V1 implementation untouched to avoid unexpected partition movement.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/21a0922a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/21a0922a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/21a0922a

Branch: refs/heads/master
Commit: 21a0922abb9fcccb1fcb4d792d5f51f677e1f49c
Parents: 4c76610
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue May 15 10:50:46 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Mon Jul 9 15:19:06 2018 -0700

----------------------------------------------------------------------
 ...stractEvenDistributionRebalanceStrategy.java |   8 +-
 .../strategy/ConstraintRebalanceStrategy.java   |  20 +-
 .../strategy/crushMapping/CardDealer.java       |   8 +
 .../CardDealingAdjustmentAlgorithm.java         |  44 ++--
 .../CardDealingAdjustmentAlgorithmV2.java       | 228 +++++++++++++++++++
 5 files changed, 264 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/21a0922a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
index a7c870b..1b1dfd6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.rebalancer.strategy;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealer;
 import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm;
 import org.apache.helix.controller.rebalancer.strategy.crushMapping.ConsistentHashingAdjustmentAlgorithm;
 import org.apache.helix.controller.rebalancer.topology.Topology;
@@ -44,10 +45,9 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal
 
   protected abstract RebalanceStrategy getBaseRebalanceStrategy();
 
-  protected CardDealingAdjustmentAlgorithm getCardDealingAlgorithm(Topology topology) {
+  protected CardDealer getCardDealingAlgorithm(Topology topology) {
     // by default, minimize the movement when calculating for evenness.
-    return new CardDealingAdjustmentAlgorithm(topology, _replica,
-        CardDealingAdjustmentAlgorithm.Mode.MINIMIZE_MOVEMENT);
+    return new CardDealingAdjustmentAlgorithm(topology, _replica);
   }
 
   @Override
@@ -88,7 +88,7 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal
       // Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution.
       Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(),
           clusterData.getClusterConfig());
-      CardDealingAdjustmentAlgorithm cardDealer = getCardDealingAlgorithm(allNodeTopo);
+      CardDealer cardDealer = getCardDealingAlgorithm(allNodeTopo);
 
       if (cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
         // Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform.

http://git-wip-us.apache.org/repos/asf/helix/blob/21a0922a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
index fe3c89d..91b8e6b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
@@ -27,7 +27,8 @@ import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
 import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
 import org.apache.helix.controller.common.ResourcesStateMap;
 import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
-import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealer;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2;
 import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.model.InstanceConfig;
@@ -82,7 +83,6 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
   /**
    * This strategy is currently for rebalance tool only.
    * For the constructor defined for AutoRebalancer, use a simplified default constraint to ensure balance.
-   *
    * Note this strategy will flip-flop almost for sure if directly used in the existing rebalancer.
    * TODO Enable different constraints for automatic rebalance process in the controller later.
    */
@@ -108,11 +108,11 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
     _softConstraints.add(defaultConstraint);
   }
 
-  protected CardDealingAdjustmentAlgorithm getCardDealingAlgorithm(Topology topology) {
+  protected CardDealer getCardDealingAlgorithm(Topology topology) {
     // For constraint based strategy, need more fine-grained assignment for each partition.
     // So evenness is more important.
-    return new CardDealingAdjustmentAlgorithm(topology, _replica,
-        CardDealingAdjustmentAlgorithm.Mode.EVENNESS);
+    return new CardDealingAdjustmentAlgorithmV2(topology, _replica,
+        CardDealingAdjustmentAlgorithmV2.Mode.EVENNESS);
   }
 
   @Override
@@ -133,10 +133,10 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
   /**
    * Generate assignment based on the constraints.
    *
-   * @param allNodes         All instances
-   * @param liveNodes        List of live instances
-   * @param currentMapping   current replica mapping. Will directly use this mapping if it meets state model requirement
-   * @param clusterData      cluster data
+   * @param allNodes       All instances
+   * @param liveNodes      List of live instances
+   * @param currentMapping current replica mapping. Will directly use this mapping if it meets state model requirement
+   * @param clusterData    cluster data
    * @return IdeaState node that contains both preference list and a proposed state mapping.
    * @throws HelixException
    */
@@ -213,7 +213,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
   /**
    * @param actualMapping
    * @return a filtered state mapping that fit state model definition.
-   *          Or null if the input mapping is conflict with state model.
+   * Or null if the input mapping is conflict with state model.
    */
   private Map<String, String> validateStateMap(Map<String, String> actualMapping) {
     Map<String, String> filteredStateMapping = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/helix/blob/21a0922a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java
new file mode 100644
index 0000000..2f76577
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealer.java
@@ -0,0 +1,8 @@
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import java.util.List;
+import java.util.Map;
+
+public interface CardDealer {
+  boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/21a0922a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
index ec3adcf..3e523e9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
@@ -5,15 +5,9 @@ import org.apache.helix.controller.rebalancer.topology.Topology;
 
 import java.util.*;
 
-public class CardDealingAdjustmentAlgorithm {
+public class CardDealingAdjustmentAlgorithm implements CardDealer {
   private static int MAX_ADJUSTMENT = 2;
 
-  public enum Mode {
-    MINIMIZE_MOVEMENT,
-    EVENNESS
-  }
-
-  private Mode _mode;
   private int _replica;
   // Instance -> FaultZone Tag
   private Map<String, String> _instanceFaultZone = new HashMap<>();
@@ -23,8 +17,7 @@ public class CardDealingAdjustmentAlgorithm {
   // Record existing partitions that are assigned to a fault zone
   private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>();
 
-  public CardDealingAdjustmentAlgorithm(Topology topology, int replica, Mode mode) {
-    _mode = mode;
+  public CardDealingAdjustmentAlgorithm(Topology topology, int replica) {
     _replica = replica;
     // Get all instance related information.
     for (Node zone : topology.getFaultZones()) {
@@ -71,19 +64,15 @@ public class CardDealingAdjustmentAlgorithm {
       targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions);
     }
 
-    int totalOverflows = 0;
+    // Calculate the expected spikes
+    // Assign spikes to each zone according to zone weight
+    int totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
     Map<String, Integer> maxZoneOverflows = new HashMap<>();
-    if (_mode.equals(Mode.MINIMIZE_MOVEMENT)) {
-      // Calculate the expected spikes
-      // Assign spikes to each zone according to zone weight
-      totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
-      for (String faultZoneName : _faultZoneWeight.keySet()) {
-        float zoneWeight = _faultZoneWeight.get(faultZoneName);
-        maxZoneOverflows.put(faultZoneName,
-            (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
-      }
+    for (String faultZoneName : _faultZoneWeight.keySet()) {
+      float zoneWeight = _faultZoneWeight.get(faultZoneName);
+      maxZoneOverflows.put(faultZoneName,
+          (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
     }
-    // Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode
 
     Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator();
     while (nodeIter.hasNext()) {
@@ -108,8 +97,8 @@ public class CardDealingAdjustmentAlgorithm {
       List<String> partitions = nodeToPartitionMap.get(instance);
       int target = (int) (Math.floor(targetPartitionCount.get(instance)));
       if (partitions.size() > target) {
-        Integer maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
-        if (maxZoneOverflow != null && maxZoneOverflow > 0 && totalOverflows > 0) {
+        int maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
+        if (maxZoneOverflow > 0 && totalOverflows > 0) {
           // When fault zone has overflow capacity AND there are still remaining overflow partitions
           target = (int) (Math.ceil(targetPartitionCount.get(instance)));
           maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1);
@@ -142,7 +131,7 @@ public class CardDealingAdjustmentAlgorithm {
   private void partitionDealing(Collection<String> instances,
       TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> faultZonePartitionMap,
       Map<String, String> faultZoneMap, final Map<String, List<String>> assignmentMap,
-      final Map<String, Float> targetPartitionCount, final int randomSeed, final int targetAdjustment) {
+      Map<String, Float> targetPartitionCount, final int randomSeed, int targetAdjustment) {
     PriorityQueue<String> instanceQueue =
         new PriorityQueue<>(instances.size(), new Comparator<String>() {
           @Override
@@ -150,13 +139,8 @@ public class CardDealingAdjustmentAlgorithm {
             int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0;
             int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0;
             if (node1Load == node2Load) {
-              Float node1Target = targetPartitionCount.get(node1);
-              Float node2Target = targetPartitionCount.get(node2);
-              if (node1Target == node2Target) {
-                return new Integer((node1 + randomSeed).hashCode()).compareTo((node2 + randomSeed).hashCode());
-              } else {
-                return node2Target.compareTo(node1Target);
-              }
+              return new Integer((node1 + randomSeed).hashCode())
+                  .compareTo((node2 + randomSeed).hashCode());
             } else {
               return node1Load - node2Load;
             }

http://git-wip-us.apache.org/repos/asf/helix/blob/21a0922a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java
new file mode 100644
index 0000000..430e0a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java
@@ -0,0 +1,228 @@
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+
+import java.util.*;
+
+public class CardDealingAdjustmentAlgorithmV2 implements CardDealer {
+  private static int MAX_ADJUSTMENT = 2;
+
+  public enum Mode {
+    MINIMIZE_MOVEMENT,
+    EVENNESS
+  }
+
+  private Mode _mode;
+  private int _replica;
+  // Instance -> FaultZone Tag
+  private Map<String, String> _instanceFaultZone = new HashMap<>();
+  private Map<String, Long> _instanceWeight = new HashMap<>();
+  private long _totalWeight = 0;
+  private Map<String, Long> _faultZoneWeight = new HashMap<>();
+  // Record existing partitions that are assigned to a fault zone
+  private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>();
+
+  public CardDealingAdjustmentAlgorithmV2(Topology topology, int replica, Mode mode) {
+    _mode = mode;
+    _replica = replica;
+    // Get all instance related information.
+    for (Node zone : topology.getFaultZones()) {
+      _faultZoneWeight.put(zone.getName(), zone.getWeight());
+      if (!_faultZonePartitionMap.containsKey(zone.getName())) {
+        _faultZonePartitionMap.put(zone.getName(), new HashSet<String>());
+      }
+      for (Node instance : Topology.getAllLeafNodes(zone)) {
+        if (!instance.isFailed()) {
+          _instanceWeight.put(instance.getName(), instance.getWeight());
+          _totalWeight += instance.getWeight();
+          _instanceFaultZone.put(instance.getName(), zone.getName());
+        }
+      }
+    }
+  }
+
+  public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed) {
+    // Records exceed partitions
+    TreeMap<String, Integer> toBeReassigned = new TreeMap<>();
+
+    // Calculate total partitions that need to be calculated
+    long totalReplicaCount = 0;
+    for (List<String> partitions : nodeToPartitionMap.values()) {
+      totalReplicaCount += partitions.size();
+    }
+    if (totalReplicaCount == 0 || _replica > _faultZoneWeight.size()) {
+      return false;
+    }
+
+    // instance -> target (ideal) partition count
+    Map<String, Float> targetPartitionCount = new HashMap<>();
+    for (String liveInstance : _instanceFaultZone.keySet()) {
+      long zoneWeight = _faultZoneWeight.get(_instanceFaultZone.get(liveInstance));
+      float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) / zoneWeight;
+      // 1. if replica = fault zone, fault zone weight does not count, so calculate according to fault zone count.
+      // 2. else, should consider fault zone weight to calculate expected threshold.
+      float zonePartitions;
+      if (_replica == _faultZoneWeight.size()) {
+        zonePartitions = ((float) totalReplicaCount) / _faultZoneWeight.size();
+      } else {
+        zonePartitions = ((float) totalReplicaCount) * zoneWeight / _totalWeight;
+      }
+      targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions);
+    }
+
+    int totalOverflows = 0;
+    Map<String, Integer> maxZoneOverflows = new HashMap<>();
+    if (_mode.equals(Mode.MINIMIZE_MOVEMENT)) {
+      // Calculate the expected spikes
+      // Assign spikes to each zone according to zone weight
+      totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
+      for (String faultZoneName : _faultZoneWeight.keySet()) {
+        float zoneWeight = _faultZoneWeight.get(faultZoneName);
+        maxZoneOverflows.put(faultZoneName,
+            (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
+      }
+    }
+    // Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode
+
+    Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator();
+    while (nodeIter.hasNext()) {
+      String instance = nodeIter.next();
+      // Cleanup the existing mapping. Remove all non-active nodes from the mapping
+      if (!_instanceFaultZone.containsKey(instance)) {
+        List<String> partitions = nodeToPartitionMap.get(instance);
+        addToReAssignPartition(toBeReassigned, partitions);
+        partitions.clear();
+        nodeIter.remove();
+      }
+    }
+
+    List<String> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet());
+    // Different resource should shuffle nodes in different ways.
+    Collections.shuffle(orderedInstances, new Random(randomSeed));
+    for (String instance : orderedInstances) {
+      if (!nodeToPartitionMap.containsKey(instance)) {
+        continue;
+      }
+      // Cut off the exceed partitions compared with target partition count.
+      List<String> partitions = nodeToPartitionMap.get(instance);
+      int target = (int) (Math.floor(targetPartitionCount.get(instance)));
+      if (partitions.size() > target) {
+        Integer maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
+        if (maxZoneOverflow != null && maxZoneOverflow > 0 && totalOverflows > 0) {
+          // When fault zone has overflow capacity AND there are still remaining overflow partitions
+          target = (int) (Math.ceil(targetPartitionCount.get(instance)));
+          maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1);
+          totalOverflows--;
+        }
+
+        // Shuffle partitions to randomly pickup exceed ones. Ensure the algorithm generates consistent results when the inputs are the same.
+        Collections.shuffle(partitions, new Random(instance.hashCode() * 31 + randomSeed));
+        addToReAssignPartition(toBeReassigned, partitions.subList(target, partitions.size()));
+
+        // Put the remaining partitions to the assignment, and record in fault zone partition list
+        List<String> remainingPartitions = new ArrayList<>(partitions.subList(0, target));
+        partitions.clear();
+        nodeToPartitionMap.put(instance, remainingPartitions);
+      }
+      _faultZonePartitionMap.get(_instanceFaultZone.get(instance))
+          .addAll(nodeToPartitionMap.get(instance));
+    }
+
+    // Reassign if any instances have space left.
+    // Assign partition according to the target capacity, CAP at "Math.floor(target) + adjustment"
+    int adjustment = 0;
+    while (!toBeReassigned.isEmpty() && adjustment <= MAX_ADJUSTMENT) {
+      partitionDealing(_instanceFaultZone.keySet(), toBeReassigned, _faultZonePartitionMap,
+          _instanceFaultZone, nodeToPartitionMap, targetPartitionCount, randomSeed, adjustment++);
+    }
+    return toBeReassigned.isEmpty();
+  }
+
+  private void partitionDealing(Collection<String> instances,
+      TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> faultZonePartitionMap,
+      Map<String, String> faultZoneMap, final Map<String, List<String>> assignmentMap,
+      final Map<String, Float> targetPartitionCount, final int randomSeed, final int targetAdjustment) {
+    PriorityQueue<String> instanceQueue =
+        new PriorityQueue<>(instances.size(), new Comparator<String>() {
+          @Override
+          public int compare(String node1, String node2) {
+            int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0;
+            int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0;
+            if (node1Load == node2Load) {
+              Float node1Target = targetPartitionCount.get(node1);
+              Float node2Target = targetPartitionCount.get(node2);
+              if (node1Target == node2Target) {
+                return new Integer((node1 + randomSeed).hashCode()).compareTo((node2 + randomSeed).hashCode());
+              } else {
+                return node2Target.compareTo(node1Target);
+              }
+            } else {
+              return node1Load - node2Load;
+            }
+          }
+        });
+    instanceQueue.addAll(instances);
+
+    while (!toBeReassigned.isEmpty()) {
+      boolean anyPartitionAssigned = false;
+      Iterator<String> instanceIter = instanceQueue.iterator();
+      while (instanceIter.hasNext()) {
+        String instance = instanceIter.next();
+        // Temporary remove the node from queue.
+        // If any partition assigned to the instance, add it back to reset priority.
+        instanceIter.remove();
+        boolean partitionAssignedToInstance = false;
+        String faultZoneStr = faultZoneMap.get(instance);
+        List<String> partitions = assignmentMap.containsKey(instance) ?
+            assignmentMap.get(instance) :
+            new ArrayList<String>();
+        int space =
+            (int) (Math.floor(targetPartitionCount.get(instance))) + targetAdjustment - partitions
+                .size();
+        if (space > 0) {
+          // Find a pending partition to locate
+          for (String pendingPartition : toBeReassigned.navigableKeySet()) {
+            if (!faultZonePartitionMap.get(faultZoneStr).contains(pendingPartition)) {
+              if (!assignmentMap.containsKey(instance)) {
+                assignmentMap.put(instance, partitions);
+              }
+              partitions.add(pendingPartition);
+              faultZonePartitionMap.get(faultZoneStr).add(pendingPartition);
+              if (toBeReassigned.get(pendingPartition) == 1) {
+                toBeReassigned.remove(pendingPartition);
+              } else {
+                toBeReassigned.put(pendingPartition, toBeReassigned.get(pendingPartition) - 1);
+              }
+              // if any assignment is made:
+              // this instance can hold more partitions in the future
+              partitionAssignedToInstance = true;
+              break;
+            }
+          }
+        }
+        if (partitionAssignedToInstance) {
+          // Reset priority in the queue
+          instanceQueue.add(instance);
+          anyPartitionAssigned = true;
+          break;
+        }
+      }
+      if (!anyPartitionAssigned) {
+        // if no pending partition is assigned to any instances in this loop, new assignment is not possible
+        break;
+      }
+    }
+  }
+
+  private void addToReAssignPartition(TreeMap<String, Integer> toBeReassigned,
+      List<String> partitions) {
+    for (String partition : partitions) {
+      if (!toBeReassigned.containsKey(partition)) {
+        toBeReassigned.put(partition, 1);
+      } else {
+        toBeReassigned.put(partition, toBeReassigned.get(partition) + 1);
+      }
+    }
+  }
+}