You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[23/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java
new file mode 100644
index 0000000..a7d116c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java
@@ -0,0 +1,788 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+/**
+ * IdealStateCalculatorForStorageNode tries to optimally allocate master/slave partitions among
+ * espresso storage nodes.
+ *
+ * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a initial state
+ * When new batches of storage nodes are added, the algorithm will calculate the new ideal state such that the total
+ * partition movements are minimized.
+ *
+ */
+public class IdealStateCalculatorForStorageNode
+{
+  static final String _MasterAssignmentMap = "MasterAssignmentMap";
+  static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
+  static final String _partitions = "partitions";
+  static final String _replicas = "replicas";
+
+  /**
+   * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+   * number of partitions
+   *
+   * 1. Calculate the master assignment by random shuffling
+   * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+   * 3. for each storage instance, calculate the i-th slave assignment map
+   * 4. Combine the i-th slave assignment maps together
+   *
+   * @param instanceNames
+   *          list of storage node instances
+   * @param partitions
+   *          number of partitions
+   * @param replicas
+   *          The number of replicas (slave partitions) per master partition
+   * @param masterStateValue
+   *          master state value: e.g. "MASTER" or "LEADER"
+   * @param slaveStateValue
+   *          slave state value: e.g. "SLAVE" or "STANDBY"
+   * @param resourceName
+   * @return a ZNRecord that contain the idealstate info
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
+                                             String masterStateValue, String slaveStateValue)
+  {
+    Collections.sort(instanceNames);
+    if(instanceNames.size() < replicas + 1)
+    {
+      throw new HelixException("Number of instances must not be less than replicas + 1. "
+                                      + "instanceNr:" + instanceNames.size()
+                                      + ", replicas:" + replicas);
+    }
+    else if(partitions < instanceNames.size())
+    {
+      ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
+      int i = 0;
+      for(String partitionId : idealState.getMapFields().keySet())
+      {
+        Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+        List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+        String masterInstance = "";
+        for(String instanceName : partitionAssignmentMap.keySet())
+        {
+          if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+              && masterInstance.equals(""))
+          {
+            masterInstance = instanceName;
+          }
+          else
+          {
+            partitionAssignmentPriorityList.add(instanceName);
+          }
+        }
+        Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
+        partitionAssignmentPriorityList.add(0, masterInstance);
+        idealState.setListField(partitionId, partitionAssignmentPriorityList);
+      }
+      return idealState;
+    }
+
+    Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
+
+    return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+  }
+
+  public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
+                                                  String masterStateValue, String slaveStateValue)
+  {
+    Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
+
+    for(int i = 1; i < instanceBatches.size(); i++)
+    {
+      result = calculateNextIdealState(instanceBatches.get(i), result);
+    }
+
+    return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+  }
+
+  /**
+   * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
+   */
+  public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
+                                    String masterStateValue, String slaveStateValue)
+  {
+    Map<String, List<Integer>> nodeMasterAssignmentMap
+    = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
+    Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+        = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
+
+    int partitions = (Integer)(result.get("partitions"));
+
+    ZNRecord idealState = new ZNRecord(resourceName);
+    idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+
+
+    for(String instanceName : nodeMasterAssignmentMap.keySet())
+    {
+      for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
+      {
+        String partitionName = resourceName+"_"+partitionId;
+        if(!idealState.getMapFields().containsKey(partitionName))
+        {
+          idealState.setMapField(partitionName, new TreeMap<String, String>());
+        }
+        idealState.getMapField(partitionName).put(instanceName, masterStateValue);
+      }
+    }
+
+    for(String instanceName : nodeSlaveAssignmentMap.keySet())
+    {
+      Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+
+      for(String slaveNode: slaveAssignmentMap.keySet())
+      {
+        List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
+        for(Integer partitionId: slaveAssignment)
+        {
+          String partitionName = resourceName+"_"+partitionId;
+          idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
+        }
+      }
+    }
+    // generate the priority list of instances per partition. Master should be at front and slave follows.
+
+    for(String partitionId : idealState.getMapFields().keySet())
+    {
+      Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+      List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+      String masterInstance = "";
+      for(String instanceName : partitionAssignmentMap.keySet())
+      {
+        if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+            && masterInstance.equals(""))
+        {
+          masterInstance = instanceName;
+        }
+        else
+        {
+          partitionAssignmentPriorityList.add(instanceName);
+        }
+      }
+      Collections.shuffle(partitionAssignmentPriorityList);
+      partitionAssignmentPriorityList.add(0, masterInstance);
+      idealState.setListField(partitionId, partitionAssignmentPriorityList);
+    }
+    assert(result.containsKey("replicas"));
+    idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
+    return idealState;
+  }
+  /**
+   * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+   * number of partitions
+   *
+   * 1. Calculate the master assignment by random shuffling
+   * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+   * 3. for each storage instance, calculate the i-th slave assignment map
+   * 4. Combine the i-th slave assignment maps together
+   *
+   * @param instanceNames
+   *          list of storage node instances
+   * @param weight
+   *          weight for the initial storage node (each node has the same weight)
+   * @param partitions
+   *          number of partitions
+   * @param replicas
+   *          The number of replicas (slave partitions) per master partition
+   * @return a map that contain the idealstate info
+   */
+  public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
+  {
+    Random r = new Random(54321);
+    assert(replicas <= instanceNames.size() - 1);
+
+    ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
+    for(int i = 0;i< partitions; i++)
+    {
+      masterPartitionAssignment.add(i);
+    }
+    // shuffle the partition id array
+    Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
+
+    // 1. Generate the random master partition assignment
+    //    instanceName -> List of master partitions on that instance
+    Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
+    for(int i = 0; i < masterPartitionAssignment.size(); i++)
+    {
+      String instanceName = instanceNames.get(i % instanceNames.size());
+      if(!nodeMasterAssignmentMap.containsKey(instanceName))
+      {
+        nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
+      }
+      nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
+    }
+
+    // instanceName -> slave assignment for its master partitions
+    // slave assignment: instanceName -> list of slave partitions on it
+    List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
+
+    Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+    Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+
+    if(replicas > 0)
+    {
+      // 2. For each node, calculate the evenly distributed slave as the first slave assignment
+      // We will figure out the 2nd ...replicas-th slave assignment based on the first level slave assignment
+      for(int i = 0; i < instanceNames.size(); i++)
+      {
+        List<String> slaveInstances = new ArrayList<String>();
+        ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
+        TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
+
+        for(int j = 0;j < instanceNames.size(); j++)
+        {
+          if(j != i)
+          {
+            slaveInstances.add(instanceNames.get(j));
+            slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
+          }
+        }
+        // Get the number of master partitions on instanceName
+        List<Integer> masterAssignment =  nodeMasterAssignmentMap.get(instanceNames.get(i));
+        // do a random shuffling as in step 1, so that the first-level slave are distributed among rest instances
+
+
+        for(int j = 0;j < masterAssignment.size(); j++)
+        {
+          slaveAssignment.add(j);
+        }
+        Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
+
+        Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
+
+        // Get the slave assignment map of node instanceName
+        for(int j = 0;j < masterAssignment.size(); j++)
+        {
+          String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
+          if(!slaveAssignmentMap.containsKey(slaveInstanceName))
+          {
+            slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
+          }
+          slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
+        }
+        firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
+      }
+      nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
+      // From the first slave assignment map, calculate the rest slave assignment maps
+      for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
+      {
+        // calculate the next slave partition assignment map
+        Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
+          = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
+        nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
+      }
+
+      // Combine the calculated 1...replicas-th slave assignment map together
+
+      for(String instanceName : nodeMasterAssignmentMap.keySet())
+      {
+        Map<String, List<Integer>> combinedSlaveAssignmentMap =  new TreeMap<String, List<Integer>>();
+
+        for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+        {
+          Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+
+          for(String slaveInstance : slaveAssignmentMap.keySet())
+          {
+            if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
+            {
+              combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
+            }
+            combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
+          }
+        }
+        migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
+        combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
+      }
+    }
+    /*
+    // Print the result master and slave assignment maps
+    System.out.println("Master assignment:");
+    for(String instanceName : nodeMasterAssignmentMap.keySet())
+    {
+      System.out.println(instanceName+":");
+      for(Integer x : nodeMasterAssignmentMap.get(instanceName))
+      {
+        System.out.print(x+" ");
+      }
+      System.out.println();
+      System.out.println("Slave assignment:");
+
+      int slaveOrder = 1;
+      for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+      {
+        System.out.println("Slave assignment order :" + (slaveOrder++));
+        Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+        for(String slaveName : slaveAssignmentMap.keySet())
+        {
+          System.out.print("\t" + slaveName +":\n\t" );
+          for(Integer x : slaveAssignmentMap.get(slaveName))
+          {
+            System.out.print(x + " ");
+          }
+          System.out.println("\n");
+        }
+      }
+      System.out.println("\nCombined slave assignment map");
+      Map<String, List<Integer>> slaveAssignmentMap = combinedNodeSlaveAssignmentMap.get(instanceName);
+      for(String slaveName : slaveAssignmentMap.keySet())
+      {
+        System.out.print("\t" + slaveName +":\n\t" );
+        for(Integer x : slaveAssignmentMap.get(slaveName))
+        {
+          System.out.print(x + " ");
+        }
+        System.out.println("\n");
+      }
+    }*/
+    Map<String, Object> result = new TreeMap<String, Object>();
+    result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
+    result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
+    result.put("replicas", new Integer(replicas));
+    result.put("partitions", new Integer(partitions));
+    return result;
+  }
+  /**
+   * In the case there are more than 1 slave, we use the following algorithm to calculate the n-th slave
+   * assignment map based on the first level slave assignment map.
+   *
+   * @param firstInstanceSlaveAssignmentMap  the first slave assignment map for all instances
+   * @param order of the slave
+   * @return the n-th slave assignment map for all the instances
+   * */
+  static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
+  {
+    Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
+
+    for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+    {
+      Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
+      result.put(currentInstance, resultAssignmentMap);
+    }
+
+    for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+    {
+      Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
+      Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
+      int offset = replicaOrder - 1;
+      for(String instance : previousSlaveAssignmentMap.keySet())
+      {
+        List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
+        // Obtain an array of other instances
+        for(String otherInstance : previousSlaveAssignmentMap.keySet())
+        {
+          otherInstances.add(otherInstance);
+        }
+        Collections.sort(otherInstances);
+        int instanceIndex = -1;
+        for(int index = 0;index < otherInstances.size(); index++)
+        {
+          if(otherInstances.get(index).equalsIgnoreCase(instance))
+          {
+            instanceIndex = index;
+          }
+        }
+        assert(instanceIndex >= 0);
+        if(instanceIndex == otherInstances.size() - 1)
+        {
+          instanceIndex --;
+        }
+        // Since we need to evenly distribute the slaves on "instance" to other partitions, we
+        // need to remove "instance" from the array.
+        otherInstances.remove(instance);
+
+        // distribute previous slave assignment to other instances.
+        List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
+        for(int i = 0; i < previousAssignmentList.size(); i++)
+        {
+
+          // Evenly distribute the previousAssignmentList to the remaining other instances
+          int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
+          String newInstance = otherInstances.get(newInstanceIndex);
+          if(!resultAssignmentMap.containsKey(newInstance))
+          {
+            resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
+          }
+          resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Given the current idealState, and the list of new Instances needed to be added, calculate the
+   * new Ideal state.
+   *
+   * 1. Calculate how many master partitions should be moved to the new cluster of instances
+   * 2. assign the number of master partitions px to be moved to each previous node
+   * 3. for each previous node,
+   *    3.1 randomly choose px nodes, move them to temp list
+   *    3.2 for each px nodes, remove them from the slave assignment map; record the map position of
+   *        the partition;
+   *    3.3 calculate # of new nodes that should be put in the slave assignment map
+   *    3.4 even-fy the slave assignment map;
+   *    3.5 randomly place # of new nodes that should be placed in
+   *
+   * 4. from all the temp master node list get from 3.1,
+   *    4.1 randomly assign them to nodes in the new cluster
+   *
+   * 5. for each node in the new cluster,
+   *    5.1 assemble the slave assignment map
+   *    5.2 even-fy the slave assignment map
+   *
+   * @param newInstances
+   *          list of new added storage node instances
+   * @param weight
+   *          weight for the new storage nodes (each node has the same weight)
+   * @param previousIdealState
+   *          The previous ideal state
+   * @return a map that contain the updated idealstate info
+   * */
+  public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
+  {
+    // Obtain the master / slave assignment info maps
+    Collections.sort(newInstances);
+    Map<String, List<Integer>> previousMasterAssignmentMap
+        = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
+    Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+        = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
+
+    List<String> oldInstances = new ArrayList<String>();
+    for(String oldInstance : previousMasterAssignmentMap.keySet())
+    {
+      oldInstances.add(oldInstance);
+    }
+
+    int previousInstanceNum = previousMasterAssignmentMap.size();
+    int partitions = (Integer)(previousIdealState.get("partitions"));
+
+    // TODO: take weight into account when calculate this
+
+    int totalMasterParitionsToMove
+        = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
+    int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
+    int remain = totalMasterParitionsToMove % previousInstanceNum;
+
+    // Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1) partitions.
+    // And we should first choose those (numMastersFromEachNode + 1) moves from the instances that has more
+    // master partitions
+    List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
+
+    // For corresponding moved slave partitions, keep track of their original location; the new node does not
+    // need to migrate all of them.
+    Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
+
+    // Make sure that the instances that holds more master partitions are put in front
+    List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
+    for(String oldInstance : previousMasterAssignmentMap.keySet())
+    {
+      List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+      if(masterAssignmentList.size() > numMastersFromEachNode)
+      {
+        bigList.add(oldInstance);
+      }
+      else
+      {
+        smallList.add(oldInstance);
+      }
+    }
+    // "sort" the list, such that the nodes that has more master partitions moves more partitions to the
+    // new added batch of instances.
+    bigList.addAll(smallList);
+    int totalSlaveMoves = 0;
+    for(String oldInstance : bigList)
+    {
+      List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+      int numToChoose = numMastersFromEachNode;
+      if(remain > 0)
+      {
+        numToChoose = numMastersFromEachNode + 1;
+        remain --;
+      }
+      // randomly remove numToChoose of master partitions to the new added nodes
+      ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
+      randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
+
+      masterPartitionListToMove.addAll(masterPartionsMoved);
+      Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
+      removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
+
+      // Make sure that for old instances, the slave placement map is evenly distributed
+      // Trace the "local slave moves", which should together contribute to most of the slave migrations
+      int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
+      // System.out.println("local moves: "+ movesWithinInstance);
+      totalSlaveMoves += movesWithinInstance;
+    }
+    // System.out.println("local slave moves total: "+ totalSlaveMoves);
+    // calculate the master /slave assignment for the new added nodes
+
+    // We already have the list of master partitions that will migrate to new batch of instances,
+    // shuffle the partitions and assign them to new instances
+    Collections.shuffle(masterPartitionListToMove, new Random(12345));
+    for(int i = 0;i < newInstances.size(); i++)
+    {
+      String newInstance = newInstances.get(i);
+      List<Integer> masterPartitionList = new ArrayList<Integer>();
+      for(int j = 0;j < masterPartitionListToMove.size(); j++)
+      {
+        if(j % newInstances.size() == i)
+        {
+          masterPartitionList.add(masterPartitionListToMove.get(j));
+        }
+      }
+
+      Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
+      for(String oldInstance : oldInstances)
+      {
+        slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
+      }
+      // Build the slave assignment map for the new instance, based on the saved information
+      // about those slave partition locations in slavePartitionsToMoveMap
+      for(Integer x : masterPartitionList)
+      {
+        for(String oldInstance : slavePartitionsToMoveMap.keySet())
+        {
+          List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
+          if(slaves.contains(x))
+          {
+            slavePartitionMap.get(oldInstance).add(x);
+          }
+        }
+      }
+      // add entry for other new instances into the slavePartitionMap
+      List<String> otherNewInstances = new ArrayList<String>();
+      for(String instance : newInstances)
+      {
+        if(!instance.equalsIgnoreCase(newInstance))
+        {
+          otherNewInstances.add(instance);
+        }
+      }
+      // Make sure that slave partitions are evenly distributed
+      migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
+
+      // Update the result in the result map. We can reuse the input previousIdealState map as
+      // the result.
+      previousMasterAssignmentMap.put(newInstance, masterPartitionList);
+      nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
+
+    }
+    /*
+    // Print content of the master/ slave assignment maps
+    for(String instanceName : previousMasterAssignmentMap.keySet())
+    {
+      System.out.println(instanceName+":");
+      for(Integer x : previousMasterAssignmentMap.get(instanceName))
+      {
+        System.out.print(x+" ");
+      }
+      System.out.println("\nmaster partition moved:");
+
+      System.out.println();
+      System.out.println("Slave assignment:");
+
+      Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+      for(String slaveName : slaveAssignmentMap.keySet())
+      {
+        System.out.print("\t" + slaveName +":\n\t" );
+        for(Integer x : slaveAssignmentMap.get(slaveName))
+        {
+          System.out.print(x + " ");
+        }
+        System.out.println("\n");
+      }
+    }
+
+    System.out.println("Master partitions migrated to new instances");
+    for(Integer x : masterPartitionListToMove)
+    {
+        System.out.print(x+" ");
+    }
+    System.out.println();
+
+    System.out.println("Slave partitions migrated to new instances");
+    for(String oldInstance : slavePartitionsToMoveMap.keySet())
+    {
+        System.out.print(oldInstance + ": ");
+        for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
+        {
+          System.out.print(x+" ");
+        }
+        System.out.println();
+    }
+    */
+    return previousIdealState;
+  }
+  
+  public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
+       String resourceName, String masterStateValue, String slaveStateValue)
+  {
+    return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
+  }
+  /**
+   * Given the list of master partition that will be migrated away from the storage instance,
+   * Remove their entries from the local instance slave assignment map.
+   *
+   * @param slaveAssignmentMap  the local instance slave assignment map
+   * @param masterPartionsMoved the list of master partition ids that will be migrated away
+   * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+   *        used by new added storage nodes.
+   * */
+  static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
+  {
+    for(String instanceName : slaveAssignmentMap.keySet())
+    {
+      List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
+      for(Integer partitionId: masterPartionsMoved)
+      {
+        if(slaveAssignment.contains(partitionId))
+        {
+          slaveAssignment.remove(partitionId);
+          if(!removedAssignmentMap.containsKey(instanceName))
+          {
+            removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
+          }
+          removedAssignmentMap.get(instanceName).add(partitionId);
+        }
+      }
+    }
+  }
+
+  /**
+   * Since some new storage instances are added, each existing storage instance should migrate some
+   * slave partitions to the new added instances.
+   *
+   * The algorithm keeps moving one partition to from the instance that hosts most slave partitions
+   * to the instance that hosts least number of partitions, until max-min <= 1.
+   *
+   * In this way we can guarantee that all instances hosts almost same number of slave partitions, also
+   * slave partitions are evenly distributed.
+   *
+   * @param slaveAssignmentMap  the local instance slave assignment map
+   * @param masterPartionsMoved the list of master partition ids that will be migrated away
+   * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+   *        used by new added storage nodes.
+   * */
+  static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
+  {
+    int moves = 0;
+    boolean done = false;
+    for(String newInstance : newInstances)
+    {
+      nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
+    }
+    while(!done)
+    {
+      List<Integer> maxAssignment = null, minAssignment = null;
+      int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
+      String minInstance = "";
+      for(String instanceName : nodeSlaveAssignmentMap.keySet())
+      {
+        List<Integer> slaveAssignment = nodeSlaveAssignmentMap.get(instanceName);
+        if(minCount > slaveAssignment.size())
+        {
+          minCount = slaveAssignment.size();
+          minAssignment = slaveAssignment;
+          minInstance = instanceName;
+        }
+        if(maxCount < slaveAssignment.size())
+        {
+          maxCount = slaveAssignment.size();
+          maxAssignment = slaveAssignment;
+        }
+      }
+      if(maxCount - minCount <= 1 )
+      {
+        done = true;
+      }
+      else
+      {
+        int indexToMove = -1;
+        // find a partition that is not contained in the minAssignment list
+        for(int i = 0; i < maxAssignment.size(); i++ )
+        {
+          if(!minAssignment.contains(maxAssignment.get(i)))
+          {
+            indexToMove = i;
+            break;
+          }
+        }
+
+        minAssignment.add(maxAssignment.get(indexToMove));
+        maxAssignment.remove(indexToMove);
+
+        if(newInstances.contains(minInstance))
+        {
+          moves++;
+        }
+      }
+    }
+    return moves;
+  }
+
+  /**
+   * Randomly select a number of elements from original list and put them in the selectedList
+   * The algorithm is used to select master partitions to be migrated when new instances are added.
+   *
+   *
+   * @param originalList  the original list
+   * @param selectedList  the list that contain selected elements
+   * @param num number of elements to be selected
+   * */
+  static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num)
+  {
+    assert(originalList.size() >= num);
+    int[] indexArray = new int[originalList.size()];
+    for(int i = 0;i < indexArray.length; i++)
+    {
+      indexArray[i] = i;
+    }
+    int numRemains = originalList.size();
+    Random r = new Random(numRemains);
+    for(int j = 0;j < num; j++)
+    {
+      int randIndex = r.nextInt(numRemains);
+      selectedList.add(originalList.get(randIndex));
+      originalList.remove(randIndex);
+      numRemains --;
+    }
+  }
+
+  public static void main(String args[])
+  {
+    List<String> instanceNames = new ArrayList<String>();
+    for(int i = 0;i < 10; i++)
+    {
+      instanceNames.add("localhost:123" + i);
+    }
+    int partitions = 48*3, replicas = 3;
+    Map<String, Object> resultOriginal = IdealStateCalculatorForStorageNode.calculateInitialIdealState(instanceNames, partitions, replicas);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java b/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java
new file mode 100644
index 0000000..b022d78
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java
@@ -0,0 +1,471 @@
+package org.apache.helix.tools;
+
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerDelegate;
+import javax.management.MBeanServerNotification;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.relation.MBeanServerNotificationFilter;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.log4j.Logger;
+
+public class JmxDumper implements NotificationListener
+{
+  public static final String help = "help";
+  public static final String domain = "domain";
+  public static final String fields = "fields";
+  public static final String pattern = "pattern";
+  public static final String operations = "operations";
+  public static final String period = "period";
+  public static final String className = "className";
+  public static final String outputFile = "outputFile";
+  public static final String jmxUrl = "jmxUrl";
+  public static final String sampleCount = "sampleCount";
+  
+  private static final Logger _logger = Logger.getLogger(JmxDumper.class);
+  String _domain;
+  MBeanServerConnection _mbeanServer;
+  
+  String _beanClassName;
+  String _namePattern;
+  int _samplePeriod;
+  
+  Map<ObjectName,ObjectName> _mbeanNames = new ConcurrentHashMap<ObjectName,ObjectName>();
+  Timer _timer;
+  
+  String _outputFileName;
+  
+  List<String> _outputFields = new ArrayList<String>();
+  Set<String> _operations = new HashSet<String>();
+  PrintWriter _outputFile;
+  int _samples = 0;
+  int _targetSamples = -1;
+  String _jmxUrl;
+  
+  public JmxDumper(String jmxService, 
+      String domain, 
+      String beanClassName, 
+      String namePattern, 
+      int samplePeriod, 
+      List<String> fields, 
+      List<String> operations, 
+      String outputfile,
+      int sampleCount
+      ) throws Exception
+  {
+    _jmxUrl = jmxService;
+    _domain = domain;
+    _beanClassName = beanClassName;
+    _samplePeriod = samplePeriod;
+    _outputFields.addAll(fields);
+    _operations.addAll(operations);
+    _outputFileName = outputfile;
+    _namePattern = namePattern;
+    _targetSamples = sampleCount;
+    
+    JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + _jmxUrl + "/jmxrmi");
+    JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+    
+    _mbeanServer = jmxc.getMBeanServerConnection();
+    MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
+    filter.enableAllObjectNames();
+    _mbeanServer.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this, filter, null);
+    init();
+    _timer = new Timer(true);
+    _timer.scheduleAtFixedRate(new SampleTask(), _samplePeriod, _samplePeriod);
+  }
+  
+  class SampleTask extends TimerTask
+  {
+    @Override
+    public void run()
+    {
+      List<ObjectName> errorMBeans = new ArrayList<ObjectName>();
+      _logger.info("Sampling " + _mbeanNames.size() + " beans");
+      for(ObjectName beanName : _mbeanNames.keySet())
+      {
+        MBeanInfo info;
+        try
+        {
+          info = _mbeanServer.getMBeanInfo(beanName);
+        }
+        catch (Exception e)
+        {
+          _logger.error( e.getMessage()+" removing it");
+          errorMBeans.add(beanName);
+          continue;
+        }
+        if(!info.getClassName().equals(_beanClassName))
+        {
+          _logger.warn("Skip: className "+info.getClassName() + " expected : "+ _beanClassName);
+          continue;
+        }
+        StringBuffer line = new StringBuffer();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
+        String date = dateFormat.format(new Date());
+        line.append(date + " ");
+        line.append(beanName.toString() + " ");
+        
+        MBeanAttributeInfo[] infos = info.getAttributes();
+        Map<String, MBeanAttributeInfo> infoMap = new HashMap<String, MBeanAttributeInfo>();
+        for(MBeanAttributeInfo infoItem : infos)
+        {
+          infoMap.put(infoItem.getName(), infoItem);
+        }
+        
+        for(String outputField : _outputFields)
+        { 
+          try
+          {
+            if(infoMap.containsKey(outputField))
+            {
+              Object mbeanAttributeValue = _mbeanServer.getAttribute(beanName, outputField);
+              line.append(mbeanAttributeValue.toString() + " ");
+            }
+            else
+            {
+              _logger.warn(outputField + " not found");
+              line.append("null ");
+            }
+          }
+          catch (Exception e)
+          {
+            _logger.error("Error:", e);
+            line.append("null ");
+            continue;
+          }
+        }
+        MBeanOperationInfo[] operations = info.getOperations();
+        Map<String, MBeanOperationInfo> opeMap = new HashMap<String, MBeanOperationInfo>();
+        for(MBeanOperationInfo opeItem : operations)
+        {
+          opeMap.put(opeItem.getName(), opeItem);
+        }
+        
+        for(String ope : _operations)
+        { 
+          if(opeMap.containsKey(ope))
+          {
+            try
+            {
+              _mbeanServer.invoke(beanName, ope, new Object[0], new String[0]);
+              //System.out.println(ope+" invoked");
+            }
+            catch(Exception e)
+            {
+              _logger.error("Error:", e);
+              continue;
+            }
+          }
+        }
+        _outputFile.println(line.toString());
+        //System.out.println(line);
+      }
+      for(ObjectName deadBean : errorMBeans)
+      {
+        _mbeanNames.remove(deadBean);
+      }
+
+      _samples ++;
+      //System.out.println("samples:"+_samples);
+      if(_samples == _targetSamples)
+      {
+        synchronized(JmxDumper.this)
+        {
+          _logger.info(_samples + " samples done, exiting...");
+          JmxDumper.this.notifyAll();
+        }
+      }
+    }
+  }
+  
+  void init() throws Exception
+  {
+    try
+    {
+      Set<ObjectInstance> existingInstances = _mbeanServer.queryMBeans(new ObjectName(_namePattern), null);
+      _logger.info("Total " + existingInstances.size() + " mbeans matched " + _namePattern);
+      for(ObjectInstance instance : existingInstances)
+      {
+        if(instance.getClassName().equals(_beanClassName))
+        {
+          _mbeanNames.put(instance.getObjectName(), instance.getObjectName());
+          _logger.info("Sampling " + instance.getObjectName());
+        }
+      }
+      FileWriter fos = new FileWriter(_outputFileName); 
+      System.out.println(_outputFileName);
+      _outputFile = new PrintWriter(fos);
+    }
+    catch (Exception e)
+    {
+      _logger.error("fail to get all existing mbeans in " + _domain, e);
+      throw e;
+    }  
+  }
+  @Override
+  public void handleNotification(Notification notification, Object handback)
+  {
+    MBeanServerNotification mbs = (MBeanServerNotification) notification;
+    if(MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(mbs.getType())) 
+    {
+      //System.out.println("Adding mbean " + mbs.getMBeanName());
+      _logger.info("Adding mbean " + mbs.getMBeanName());
+      if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+      { 
+        addMBean( mbs.getMBeanName());
+      } 
+    }
+    else if(MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(mbs.getType())) 
+    {
+      //System.out.println("Removing mbean " + mbs.getMBeanName());
+      _logger.info("Removing mbean " + mbs.getMBeanName());
+      if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+      {
+        removeMBean(mbs.getMBeanName());
+      }
+    }
+  }
+  
+  private void addMBean(ObjectName beanName)
+  {
+    _mbeanNames.put(beanName, beanName);
+  }
+  
+  private void removeMBean(ObjectName beanName)
+  {
+    _mbeanNames.remove(beanName);
+  }
+  
+  public static int processCommandLineArgs(String[] cliArgs) throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try
+    {
+      cmd = cliParser.parse(cliOptions, cliArgs);
+    }
+    catch (ParseException pe)
+    {
+      System.err.println("CommandLineClient: failed to parse command-line options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    boolean ret = checkOptionArgsNumber(cmd.getOptions());
+    if (ret == false)
+    {
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    
+    String portStr = cmd.getOptionValue(jmxUrl);
+    //int portVal = Integer.parseInt(portStr);
+    
+    String periodStr = cmd.getOptionValue(period);
+    int periodVal = Integer.parseInt(periodStr);
+    
+    String domainStr = cmd.getOptionValue(domain);
+    String classNameStr = cmd.getOptionValue(className);
+    String patternStr = cmd.getOptionValue(pattern);
+    String fieldsStr = cmd.getOptionValue(fields);
+    String operationsStr = cmd.getOptionValue(operations);
+    String resultFile = cmd.getOptionValue(outputFile);
+    String sampleCountStr = cmd.getOptionValue(sampleCount, "-1");
+    int sampleCount = Integer.parseInt(sampleCountStr);
+    
+    List<String> fields = Arrays.asList(fieldsStr.split(","));
+    List<String> operations = Arrays.asList(operationsStr.split(","));
+    
+    JmxDumper dumper = null;
+    try
+    {
+      dumper = new JmxDumper(portStr, domainStr, classNameStr, patternStr, periodVal, fields, operations, resultFile, sampleCount);
+      synchronized(dumper)
+      {
+        dumper.wait();
+      }
+    }
+    finally
+    {
+      if(dumper != null)
+      {
+        dumper.flushFile();
+      }
+    }
+    return 0;
+  }
+  
+  private void flushFile()
+  {
+    if(_outputFile != null)
+    {
+      _outputFile.flush();
+      _outputFile.close();
+    }
+  }
+  private static boolean checkOptionArgsNumber(Option[] options)
+  {
+    for (Option option : options)
+    {
+      int argNb = option.getArgs();
+      String[] args = option.getValues();
+      if (argNb == 0)
+      {
+        if (args != null && args.length > 0)
+        {
+          System.err.println(option.getArgName() + " shall have " + argNb
+              + " arguments (was " + Arrays.toString(args) + ")");
+          return false;
+        }
+      }
+      else
+      {
+        if (args == null || args.length != argNb)
+        {
+          System.err.println(option.getArgName() + " shall have " + argNb
+              + " arguments (was " + Arrays.toString(args) + ")");
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+  
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption =
+        OptionBuilder.withLongOpt(help)
+                     .withDescription("Prints command-line options info")
+                     .create();
+    Option domainOption =
+        OptionBuilder.withLongOpt(domain)
+                     .withDescription("Domain of the JMX bean")
+                     .create();
+
+    domainOption.setArgs(1);
+    domainOption.setRequired(true);
+    
+    Option fieldsOption =
+        OptionBuilder.withLongOpt(fields)
+                     .withDescription("Fields of the JMX bean to sample")
+                     .create();
+    fieldsOption.setArgs(1);
+    fieldsOption.setRequired(false);
+    
+    Option operationOption =
+        OptionBuilder.withLongOpt(operations)
+                     .withDescription("Operation to invoke")
+                     .create();
+    operationOption.setArgs(1);
+    operationOption.setRequired(true);
+    
+    Option periodOption =
+        OptionBuilder.withLongOpt(period)
+                     .withDescription("Sampling period in MS")
+                     .create();
+    periodOption.setArgs(1);
+    periodOption.setRequired(false);
+    
+    Option classOption =
+        OptionBuilder.withLongOpt(className)
+                     .withDescription("Classname of the MBean")
+                     .create();
+    classOption.setArgs(1);
+    classOption.setRequired(true);
+    
+    Option patternOption =
+        OptionBuilder.withLongOpt(pattern)
+                     .withDescription("pattern of the MBean")
+                     .create();
+    patternOption.setArgs(1);
+    patternOption.setRequired(true);
+    
+    Option outputFileOption =
+        OptionBuilder.withLongOpt(outputFile)
+                     .withDescription("outputFileName")
+                     .create();
+    outputFileOption.setArgs(1);
+    outputFileOption.setRequired(false);
+    
+    Option jmxUrlOption =
+        OptionBuilder.withLongOpt(jmxUrl)
+                     .withDescription("jmx port to connect to")
+                     .create();
+    jmxUrlOption.setArgs(1);
+    jmxUrlOption.setRequired(true);
+    
+    Option sampleCountOption =
+        OptionBuilder.withLongOpt(sampleCount)
+                     .withDescription("# of samples to take")
+                     .create();
+    sampleCountOption.setArgs(1);
+    sampleCountOption.setRequired(false);
+    
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(domainOption);
+    options.addOption(fieldsOption);
+    options.addOption(operationOption);
+    options.addOption(classOption);
+    options.addOption(outputFileOption);
+    options.addOption(jmxUrlOption);
+    options.addOption(patternOption);
+    options.addOption(periodOption);
+    options.addOption(sampleCountOption);
+    return options;
+  }
+  
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + JmxDumper.class.getName(), cliOptions);
+  }
+  
+  public static void main(String[] args) throws Exception
+  {
+   /* List<String> fields = Arrays.asList(new String("AvgLatency,MaxLatency,MinLatency,PacketsReceived,PacketsSent").split(","));
+    List<String> operations = Arrays.asList(new String("resetCounters").split(","));
+    
+    JmxDumper dumper = new JmxDumper(27961, "org.apache.zooKeeperService", "org.apache.zookeeper.server.ConnectionBean", "org.apache.ZooKeeperService:name0=*,name1=Connections,name2=*,name3=*", 1000, fields, operations, "/tmp/1.csv");
+    Thread.currentThread().join();
+    */
+    int ret = processCommandLineArgs(args);
+    System.exit(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java b/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java
new file mode 100644
index 0000000..1693fbf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+
+/**
+ * Provides ability to start zookeeper locally on a particular port
+ * 
+ * @author kgopalak
+ * 
+ */
+public class LocalZKServer
+{
+  public void start(int port, String dataDir, String logDir) throws Exception
+  {
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+    {
+
+      @Override
+      public void createDefaultNameSpace(ZkClient zkClient)
+      {
+
+      }
+    };
+    ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    server.start();
+    Thread.currentThread().join();
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    int port = 2199;
+    String rootDir = System.getProperty("java.io.tmpdir") + "/zk-helix/"
+        + System.currentTimeMillis();
+    String dataDir = rootDir + "/dataDir";
+    String logDir = rootDir + "/logDir";
+
+    if (args.length > 0)
+    {
+      port = Integer.parseInt(args[0]);
+    }
+    if (args.length > 1)
+    {
+      dataDir = args[1];
+      logDir = args[1];
+    }
+
+    if (args.length > 2)
+    {
+      logDir = args[2];
+    }
+    System.out.println("Starting Zookeeper locally at port:" + port
+        + " dataDir:" + dataDir + " logDir:" + logDir);
+    LocalZKServer localZKServer = new LocalZKServer();
+    
+    localZKServer.start(port, dataDir, logDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
new file mode 100644
index 0000000..9e1bf09
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.UUID;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.util.HelixUtil;
+
+
+public class MessagePoster
+{
+  public void post(String zkServer,
+                   Message message,
+                   String clusterName,
+                   String instanceName)
+  {
+    ZkClient client = new ZkClient(zkServer);
+    client.setZkSerializer(new ZNRecordSerializer());
+    String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/" + message.getId();
+    client.delete(path);
+    ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName, instanceName));
+    message.setTgtSessionId(record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString()).toString());
+    message.setTgtName(record.getId());
+    //System.out.println(message);
+    client.createPersistent(path, message.getRecord());
+  }
+
+  public void postFaultInjectionMessage(String zkServer,
+                                        String clusterName,
+                                        String instanceName,
+                                        String payloadString,
+                                        String partition)
+  {
+    Message message = new Message("FaultInjection", UUID.randomUUID().toString());
+    if(payloadString != null)
+    {
+      message.getRecord().setSimpleField("faultType", payloadString);
+    }
+    if(partition != null)
+    {
+      message.setPartitionName(partition);
+    }
+    
+    post(zkServer, message, clusterName, instanceName);
+  }
+
+  public void postTestMessage(String zkServer, String clusterName, String instanceName)
+  {
+    String msgSrc = "cm-instance-0";
+    String msgId = "TestMessageId-2";
+
+    Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+    message.setMsgId(msgId);
+    message.setSrcName(msgSrc);
+    message.setTgtName(instanceName);
+    message.setMsgState(MessageState.NEW);
+    message.setFromState("Slave");
+    message.setToState("Master");
+    message.setPartitionName("EspressoDB.partition-0." + instanceName);
+
+    post(zkServer, message, clusterName, instanceName);
+  }
+
+  public static void main(String[] args)
+  {
+    if (args.length < 4 || args.length > 6)      
+    {
+      System.err.println("Usage: java " + MessagePoster.class.getName()
+          + " zkServer cluster instance msgType [payloadString] [partition]");
+      System.err.println("msgType can be one of test, fault");
+      System.err.println("payloadString is sent along with the fault msgType");
+      System.exit(1);
+    }
+    String zkServer = args[0];
+    String cluster = args[1];
+    String instance = args[2];
+    String msgType = args[3];
+    String payloadString = (args.length >= 5 ? args[4] : null);
+    String partition = (args.length == 6 ? args[5] : null);
+
+    MessagePoster messagePoster = new MessagePoster();
+    if (msgType.equals("test"))
+    {
+      messagePoster.postTestMessage(zkServer, cluster, instance);
+    }
+    else if (msgType.equals("fault"))
+    {
+      messagePoster.postFaultInjectionMessage(zkServer,
+                                              cluster,
+                                              instance,
+                                              payloadString,
+                                              partition);
+      System.out.println("Posted " + msgType);
+    }
+    else
+    {
+      System.err.println("Message was not posted. Unknown msgType:" + msgType);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java b/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java
new file mode 100644
index 0000000..a5d5472
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class PropertiesReader
+{
+  private static final Logger LOG = Logger
+      .getLogger(PropertiesReader.class.getName());
+
+  private final Properties _properties = new Properties();
+
+  public PropertiesReader(String propertyFileName)
+  {
+    try
+    {
+      InputStream stream = Thread.currentThread().getContextClassLoader()
+          .getResourceAsStream(propertyFileName);
+      _properties.load(stream);
+    }
+    catch (Exception e)
+    {
+      String errMsg = "could not open properties file:" + propertyFileName;
+      // LOG.error(errMsg, e);
+      throw new IllegalArgumentException(errMsg, e);
+    }
+  }
+
+  public String getProperty(String key)
+  {
+    String value = _properties.getProperty(key);
+    if (value == null)
+    {
+      throw new IllegalArgumentException("no property exist for key:" + key);
+    }
+
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java b/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java
new file mode 100644
index 0000000..202763e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java
@@ -0,0 +1,352 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.*;
+import java.util.ArrayList;
+import java.util.zip.CRC32;
+
+public class RUSHrHash
+{
+  /**
+   * @var int holds the value for how many replicas to create for an object
+   */
+  protected int replicationDegree = 1;
+
+  /**
+   * an array of hash maps where each hash map holds info on the sub cluster
+   * that corresponds to the array indices meaning that array element 0 holds
+   * data for server 0
+   * 
+   * that is the total number of nodes in the cluster this property is populated
+   * at construction time only
+   * 
+   * @var
+   */
+
+  protected HashMap[] clusters;
+
+  /**
+   * an array of hash maps where each element holds data for a sub cluster
+   */
+  protected HashMap[] clusterConfig;
+
+  /**
+   * total number of sub-clusters in our data configuration this property is
+   * populated at construction time only
+   * 
+   * @var integer
+   */
+  protected int totalClusters = 0;
+
+  /**
+   * the total number of nodes in all of the subClusters this property is
+   * populated at construction time only
+   * 
+   * @var integer
+   */
+  protected int totalNodes = 0;
+
+  /**
+   * the total number of nodes in all of the clusters this property is populated
+   * at construction time only
+   * 
+   * @var integer
+   */
+  protected int totalNodesW = 0;
+
+  /**
+   * an array of HashMaps where each HashMap holds the data for a single node
+   */
+  protected HashMap[] nodes = null;
+
+  /**
+   * @var integer value used to help seed the random number generator
+   */
+  protected final int SEED_PARAM = 1560;
+
+  /**
+   * random number generator
+   */
+
+  Random ran = new Random();
+
+  /**
+   * maximum value we can have from the ran generator
+   */
+  float ranMax = (float) Math.pow(2.0, 16.0);
+
+  /**
+   * The constructor analyzes the passed config to obtain the fundamental values
+   * and data structures for locating a node. Each of those values is described
+   * in detail above with each property. briefly:
+   * 
+   * this.clusters this.totalClusters this.totalNodes
+   * 
+   * The values above are derived from the HashMap[] oonfig passed to the
+   * locator.
+   * 
+   * @param conf
+   *          dataConfig
+   * 
+   * @throws Exception
+   * 
+   */
+
+  public RUSHrHash(HashMap<String, Object> conf) throws Exception
+  {
+
+    clusterConfig = (HashMap[]) conf.get("subClusters");
+    replicationDegree = (Integer) conf.get("replicationDegree");
+
+    HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
+    totalClusters = subClusters.length;
+    clusters = new HashMap[totalClusters];
+    // check the confg for all of the params
+    // throw a exception if they are not there
+    if (totalClusters <= 0)
+    {
+      throw new Exception(
+          "data config to the RUSHr locator does not contain a valid clusters property");
+    }
+
+    int nodeCt = 0;
+    HashMap[] nodeData = null;
+    ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
+    HashMap subCluster = null, clusterData = null;
+    Integer clusterDataList[] = null;
+    for (int i = 0; i < totalClusters; i++)
+    {
+      subCluster = subClusters[i];
+      nodeData = (HashMap[]) subCluster.get("nodes");
+
+      nodeCt = nodeData.length;
+      clusterDataList = new Integer[nodeCt];
+      for (int n = 0; n < nodeCt; n++)
+      {
+        tempNodes.add(nodeData[n]);
+        clusterDataList[n] = n;
+      }
+      totalNodes += nodeCt;
+      totalNodesW += nodeCt * (Integer) subCluster.get("weight");
+
+      clusterData = new HashMap<String, Object>();
+      clusterData.put("count", nodeCt);
+      clusterData.put("list", clusterDataList);
+      clusters[i] = clusterData;
+    }
+    nodes = new HashMap[totalNodes];
+    tempNodes.toArray(nodes);
+  }
+
+  /**
+   * This function is an implementation of a RUSHr algorithm as described by R J
+   * Honicky and Ethan Miller
+   * 
+   * @param objKey
+   * @throws Exception
+   * @return
+   */
+  public ArrayList<HashMap> findNode(long objKey) throws Exception
+  {
+
+    HashMap[] c = this.clusters;
+    int sumRemainingNodes = this.totalNodes;
+    int sumRemainingNodesW = this.totalNodesW;
+    int repDeg = this.replicationDegree;
+    int totClu = this.totalClusters;
+    int totNod = this.totalNodes;
+    HashMap[] clusConfig = this.clusterConfig;
+
+    // throw an exception if the data is no good
+    if ((totNod <= 0) || (totClu <= 0))
+    {
+      throw new Exception(
+          "the total nodes or total clusters is negative or 0.  bad joo joos!");
+    }
+
+    // get the starting cluster
+    int currentCluster = totClu - 1;
+
+    /**
+     * this loop is an implementation of the RUSHr algorithm for fast placement
+     * and location of objects in a distributed storage system
+     * 
+     * j = current cluster m = disks in current cluster n = remaining nodes
+     */
+    ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
+    while (true)
+    {
+
+      // prevent an infinite loop, in case there is a bug
+      if (currentCluster < 0)
+      {
+        throw new Exception(
+            "the cluster index became negative while we were looking for the following id: objKey.  This should never happen with any key.  There is a bug or maybe your joo joos are BAD!");
+      }
+
+      HashMap clusterData = clusConfig[currentCluster];
+      Integer weight = (Integer) clusterData.get("weight");
+
+      Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
+      sumRemainingNodes -= disksInCurrentCluster;
+
+      Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
+      sumRemainingNodesW -= disksInCurrentClusterW;
+
+      // set the seed to our set id
+      long seed = objKey + currentCluster;
+      ran.setSeed(seed);
+      int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes)
+          : 0;
+
+      int u = t
+          + drawWHG(repDeg - t, disksInCurrentClusterW - t,
+              disksInCurrentClusterW + sumRemainingNodesW - t, weight);
+      if (u > 0)
+      {
+        if (u > disksInCurrentCluster)
+        {
+          u = disksInCurrentCluster;
+        }
+        ran.setSeed(objKey + currentCluster + SEED_PARAM);
+        choose(u, currentCluster, sumRemainingNodes, nodeData);
+        reset(u, currentCluster);
+        repDeg -= u;
+      }
+      if (repDeg == 0)
+      {
+        break;
+      }
+      currentCluster--;
+    }
+    return nodeData;
+  }
+
+  /**
+   * This function is an implementation of a RUSH algorithm as described by R J
+   * Honicky and Ethan Miller
+   * 
+   * @param objKey
+   *          - an int used as the prng seed. this int is usually derived from a
+   *          string hash
+   * 
+   * @return node - holds three values: abs_node - an int which is the absolute
+   *         position of the located node in relation to all nodes on all
+   *         subClusters rel_node - an int which is the relative postion located
+   *         node within the located cluster cluster - an int which is the
+   *         located cluster
+   * @throws Exception
+   * 
+   */
+  public ArrayList<HashMap> findNode(String objKey) throws Exception
+  {
+    // turn a string identifier into an integer for the random seed
+    CRC32 crc32 = new CRC32();
+    byte[] bytes = objKey.getBytes();
+    crc32.update(bytes);
+    long crc32Value = crc32.getValue();
+    long objKeyLong = (crc32Value >> 16) & 0x7fff;
+    return findNode(objKeyLong);
+  }
+
+  public void reset(int nodesToRetrieve, int currentCluster)
+  {
+    Integer[] list = (Integer[]) clusters[currentCluster].get("list");
+    Integer count = (Integer) clusters[currentCluster].get("count");
+
+    int listIdx;
+    int val;
+    for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
+    {
+      listIdx = count - nodesToRetrieve + nodeIdx;
+      val = list[listIdx];
+      if (val < (count - nodesToRetrieve))
+      {
+        list[val] = val;
+      }
+      list[listIdx] = listIdx;
+    }
+  }
+
+  public void choose(int nodesToRetrieve, int currentCluster,
+      int remainingNodes, ArrayList<HashMap> nodeData)
+  {
+    Integer[] list = (Integer[]) clusters[currentCluster].get("list");
+    Integer count = (Integer) clusters[currentCluster].get("count");
+
+    int maxIdx;
+    int randNode;
+    int chosen;
+    for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
+    {
+      maxIdx = count - nodeIdx - 1;
+      randNode = ran.nextInt(maxIdx + 1);
+      // swap
+      chosen = list[randNode];
+      list[randNode] = list[maxIdx];
+      list[maxIdx] = chosen;
+      // add the remaining nodes so we can find the node data when we are done
+      nodeData.add(nodes[remainingNodes + chosen]);
+    }
+  }
+
+  /**
+   * @param objKey
+   * @return
+   * @throws com.targetnode.data.locator.Exception
+   */
+  public ArrayList<HashMap> findNodes(String objKey) throws Exception
+  {
+    return findNode(objKey);
+  }
+
+  public int getReplicationDegree()
+  {
+    return replicationDegree;
+  }
+
+  public int getTotalNodes()
+  {
+    return totalNodes;
+  }
+
+  public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks,
+      int weight)
+  {
+    int found = 0;
+    float z;
+    float prob;
+    int ranInt;
+
+    for (int i = 0; i < replicas; i++)
+    {
+      if (totalDisks != 0)
+      {
+        ranInt = ran.nextInt((int) (ranMax + 1));
+        z = ((float) ranInt / ranMax);
+        prob = ((float) disksInCurrentCluster / (float) totalDisks);
+        if (z <= prob)
+        {
+          found++;
+          disksInCurrentCluster -= weight;
+        }
+        totalDisks -= weight;
+      }
+    }
+    return found;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
new file mode 100644
index 0000000..9c07c13
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -0,0 +1,347 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+
+
+public class StateModelConfigGenerator
+{
+
+  public static void main(String[] args)
+  {
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    StateModelConfigGenerator generator = new StateModelConfigGenerator();
+    System.out.println(new String(serializer.serialize(generator.generateConfigForMasterSlave())));
+  }
+
+  /**
+   * count -1 dont care any numeric value > 0 will be tried to be satisfied based on
+   * priority N all nodes in the cluster will be assigned to this state if possible R all
+   * remaining nodes in the preference list will be assigned to this state, applies only
+   * to last state
+   */
+
+  public ZNRecord generateConfigForStorageSchemata()
+  {
+    ZNRecord record = new ZNRecord("STORAGE_DEFAULT_SM_SCHEMATA");
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+                          "OFFLINE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("MASTER");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("DROPPED");
+    statePriorityList.add("ERROR");
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+                        statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("MASTER"))
+      {
+        metadata.put("count", "N");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("OFFLINE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("DROPPED"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("ERROR"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+    }
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      if (state.equals("MASTER"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("OFFLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("MASTER", "MASTER");
+        metadata.put("DROPPED", "DROPPED");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("ERROR"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("OFFLINE", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("MASTER-OFFLINE");
+    stateTransitionPriorityList.add("OFFLINE-MASTER");
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+    return record;
+  }
+
+  public ZNRecord generateConfigForMasterSlave()
+  {
+    ZNRecord record = new ZNRecord("MasterSlave");
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+                          "OFFLINE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("MASTER");
+    statePriorityList.add("SLAVE");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("DROPPED");
+    statePriorityList.add("ERROR");
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+                        statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("MASTER"))
+      {
+        metadata.put("count", "1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("SLAVE"))
+      {
+        metadata.put("count", "R");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("OFFLINE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("DROPPED"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("ERROR"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+    }
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      if (state.equals("MASTER"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("SLAVE", "SLAVE");
+        metadata.put("OFFLINE", "SLAVE");
+        metadata.put("DROPPED", "SLAVE");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("SLAVE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("MASTER", "MASTER");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("OFFLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("SLAVE", "SLAVE");
+        metadata.put("MASTER", "SLAVE");
+        metadata.put("DROPPED", "DROPPED");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("ERROR"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("OFFLINE", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("MASTER-SLAVE");
+    stateTransitionPriorityList.add("SLAVE-MASTER");
+    stateTransitionPriorityList.add("OFFLINE-SLAVE");
+    stateTransitionPriorityList.add("SLAVE-OFFLINE");
+    stateTransitionPriorityList.add("OFFLINE-DROPPED");
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+    return record;
+    // ZNRecordSerializer serializer = new ZNRecordSerializer();
+    // System.out.println(new String(serializer.serialize(record)));
+  }
+
+  public ZNRecord generateConfigForLeaderStandby()
+  {
+    ZNRecord record = new ZNRecord("LeaderStandby");
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+                          "OFFLINE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("LEADER");
+    statePriorityList.add("STANDBY");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("DROPPED");
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+                        statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("LEADER"))
+      {
+        metadata.put("count", "1");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("STANDBY"))
+      {
+        metadata.put("count", "R");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("OFFLINE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("DROPPED"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+
+    }
+
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      if (state.equals("LEADER"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("STANDBY", "STANDBY");
+        metadata.put("OFFLINE", "STANDBY");
+        metadata.put("DROPPED", "STANDBY");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("STANDBY"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("LEADER", "LEADER");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("OFFLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("STANDBY", "STANDBY");
+        metadata.put("LEADER", "STANDBY");
+        metadata.put("DROPPED", "DROPPED");
+        record.setMapField(key, metadata);
+      }
+
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("LEADER-STANDBY");
+    stateTransitionPriorityList.add("STANDBY-LEADER");
+    stateTransitionPriorityList.add("OFFLINE-STANDBY");
+    stateTransitionPriorityList.add("STANDBY-OFFLINE");
+    stateTransitionPriorityList.add("OFFLINE-DROPPED");
+
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+    return record;
+    // ZNRecordSerializer serializer = new ZNRecordSerializer();
+    // System.out.println(new String(serializer.serialize(record)));
+  }
+
+  public ZNRecord generateConfigForOnlineOffline()
+  {
+    ZNRecord record = new ZNRecord("OnlineOffline");
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+                          "OFFLINE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("ONLINE");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("DROPPED");
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+                        statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("ONLINE"))
+      {
+        metadata.put("count", "R");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("OFFLINE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("DROPPED"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+    }
+
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      if (state.equals("ONLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("OFFLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "ONLINE");
+        metadata.put("DROPPED", "DROPPED");
+        record.setMapField(key, metadata);
+      }
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("OFFLINE-ONLINE");
+    stateTransitionPriorityList.add("ONLINE-OFFLINE");
+    stateTransitionPriorityList.add("OFFLINE-DROPPED");
+
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+    return record;
+    // ZNRecordSerializer serializer = new ZNRecordSerializer();
+    // System.out.println(new String(serializer.serialize(record)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java b/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java
new file mode 100644
index 0000000..28ca786
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import org.apache.helix.HelixManager;
+
+public class TestCommand
+{
+  public enum CommandType
+  {
+    MODIFY,
+    VERIFY,
+    START,
+    STOP
+  }
+
+  public static class NodeOpArg
+  {
+    public HelixManager _manager;
+    public Thread _thread;
+
+    public NodeOpArg(HelixManager manager, Thread thread)
+    {
+      _manager = manager;
+      _thread = thread;
+    }
+  }
+
+  public TestTrigger _trigger;
+  public CommandType _commandType;
+  public ZnodeOpArg _znodeOpArg;
+  public NodeOpArg _nodeOpArg;
+
+  public long _startTimestamp;
+  public long _finishTimestamp;
+
+  /**
+   *
+   * @param type
+   * @param arg
+   */
+  public TestCommand(CommandType type, ZnodeOpArg arg)
+  {
+    this(type, new TestTrigger(), arg);
+  }
+
+  /**
+   *
+   * @param type
+   * @param trigger
+   * @param arg
+   */
+  public TestCommand(CommandType type, TestTrigger trigger, ZnodeOpArg arg)
+  {
+    _commandType = type;
+    _trigger = trigger;
+    _znodeOpArg = arg;
+  }
+
+  /**
+   *
+   * @param type
+   * @param trigger
+   * @param arg
+   */
+  public TestCommand(CommandType type, TestTrigger trigger, NodeOpArg arg)
+  {
+    _commandType = type;
+    _trigger = trigger;
+    _nodeOpArg = arg;
+  }
+
+  @Override
+  public String toString()
+  {
+    String ret = super.toString().substring(super.toString().lastIndexOf(".") + 1) + " ";
+    if (_finishTimestamp > 0)
+    {
+      ret += "FINISH@" + _finishTimestamp + "-START@" + _startTimestamp
+               + "=" + (_finishTimestamp - _startTimestamp) + "ms ";
+    }
+    if (_commandType == CommandType.MODIFY || _commandType == CommandType.VERIFY)
+    {
+      ret += _commandType.toString() + "|" + _trigger.toString() + "|" + _znodeOpArg.toString();
+    }
+    else if (_commandType == CommandType.START || _commandType == CommandType.STOP)
+    {
+      ret += _commandType.toString() + "|" + _trigger.toString() + "|" + _nodeOpArg.toString();
+    }
+
+    return ret;
+  }
+}