You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[22/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
new file mode 100644
index 0000000..2230d97
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -0,0 +1,909 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.ClusterView;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.file.FileHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+
+
+public class ClusterStateVerifier
+{
+  public static String  cluster         = "cluster";
+  public static String  zkServerAddress = "zkSvr";
+  public static String  help            = "help";
+  public static String  timeout         = "timeout";
+  public static String  period          = "period";
+
+  private static Logger LOG             = Logger.getLogger(ClusterStateVerifier.class);
+
+  public interface Verifier
+  {
+    boolean verify();
+  }
+
+  public interface ZkVerifier extends Verifier
+  {
+    ZkClient getZkClient();
+
+    String getClusterName();
+  }
+
+  static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener
+  {
+    final CountDownLatch _countDown;
+    final ZkClient       _zkClient;
+    final Verifier       _verifier;
+
+    public ExtViewVeriferZkListener(CountDownLatch countDown,
+                                    ZkClient zkClient,
+                                    ZkVerifier verifier)
+    {
+      _countDown = countDown;
+      _zkClient = zkClient;
+      _verifier = verifier;
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception
+    {
+      boolean result = _verifier.verify();
+      if (result == true)
+      {
+        _countDown.countDown();
+      }
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+    {
+      for (String child : currentChilds)
+      {
+        String childPath =
+            parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
+        _zkClient.subscribeDataChanges(childPath, this);
+      }
+
+      boolean result = _verifier.verify();
+      if (result == true)
+      {
+        _countDown.countDown();
+      }
+    }
+
+  }
+
+  /**
+   * verifier that verifies best possible state and external view
+   */
+  public static class BestPossAndExtViewZkVerifier implements ZkVerifier
+  {
+    private final String                           zkAddr;
+    private final String                           clusterName;
+    private final Map<String, Map<String, String>> errStates;
+    private final ZkClient                         zkClient;
+
+    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName)
+    {
+      this(zkAddr, clusterName, null);
+    }
+
+    public BestPossAndExtViewZkVerifier(String zkAddr,
+                                        String clusterName,
+                                        Map<String, Map<String, String>> errStates)
+    {
+      if (zkAddr == null || clusterName == null)
+      {
+        throw new IllegalArgumentException("requires zkAddr|clusterName");
+      }
+      this.zkAddr = zkAddr;
+      this.clusterName = clusterName;
+      this.errStates = errStates;
+      this.zkClient = ZKClientPool.getZkClient(zkAddr); // null;
+    }
+
+    @Override
+    public boolean verify()
+    {
+      try
+      {
+        HelixDataAccessor accessor =
+            new ZKHelixDataAccessor(clusterName,
+                                    new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+      }
+      catch (Exception e)
+      {
+        LOG.error("exception in verification", e);
+      }
+      return false;
+    }
+
+    @Override
+    public ZkClient getZkClient()
+    {
+      return zkClient;
+    }
+
+    @Override
+    public String getClusterName()
+    {
+      return clusterName;
+    }
+
+    @Override
+    public String toString()
+    {
+      String verifierName = getClass().getName();
+      verifierName =
+          verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+      return verifierName + "(" + clusterName + "@" + zkAddr + ")";
+    }
+  }
+
+  public static class BestPossAndExtViewFileVerifier implements Verifier
+  {
+    private final String                           rootPath;
+    private final String                           clusterName;
+    private final Map<String, Map<String, String>> errStates;
+    private final FilePropertyStore<ZNRecord>      fileStore;
+
+    public BestPossAndExtViewFileVerifier(String rootPath, String clusterName)
+    {
+      this(rootPath, clusterName, null);
+    }
+
+    public BestPossAndExtViewFileVerifier(String rootPath,
+                                          String clusterName,
+                                          Map<String, Map<String, String>> errStates)
+    {
+      if (rootPath == null || clusterName == null)
+      {
+        throw new IllegalArgumentException("requires rootPath|clusterName");
+      }
+      this.rootPath = rootPath;
+      this.clusterName = clusterName;
+      this.errStates = errStates;
+
+      this.fileStore =
+          new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+                                          rootPath,
+                                          new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+    }
+
+    @Override
+    public boolean verify()
+    {
+      try
+      {
+        HelixDataAccessor accessor = new FileHelixDataAccessor(fileStore, clusterName);
+
+        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+      }
+      catch (Exception e)
+      {
+        LOG.error("exception in verification", e);
+        return false;
+      }
+      finally
+      {
+      }
+    }
+
+    @Override
+    public String toString()
+    {
+      String verifierName = getClass().getName();
+      verifierName =
+          verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+      return verifierName + "(" + rootPath + "@" + clusterName + ")";
+    }
+  }
+
+  public static class MasterNbInExtViewVerifier implements ZkVerifier
+  {
+    private final String   zkAddr;
+    private final String   clusterName;
+    private final ZkClient zkClient;
+
+    public MasterNbInExtViewVerifier(String zkAddr, String clusterName)
+    {
+      if (zkAddr == null || clusterName == null)
+      {
+        throw new IllegalArgumentException("requires zkAddr|clusterName");
+      }
+      this.zkAddr = zkAddr;
+      this.clusterName = clusterName;
+      this.zkClient = ZKClientPool.getZkClient(zkAddr);
+    }
+
+    @Override
+    public boolean verify()
+    {
+      try
+      {
+        ZKHelixDataAccessor accessor =
+            new ZKHelixDataAccessor(clusterName,
+                                    new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+        return ClusterStateVerifier.verifyMasterNbInExtView(accessor);
+      }
+      catch (Exception e)
+      {
+        LOG.error("exception in verification", e);
+      }
+      return false;
+    }
+
+    @Override
+    public ZkClient getZkClient()
+    {
+      return zkClient;
+    }
+
+    @Override
+    public String getClusterName()
+    {
+      return clusterName;
+    }
+
+  }
+
+  static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
+                                          Map<String, Map<String, String>> errStates)
+  {
+    try
+    {
+      Builder keyBuilder = accessor.keyBuilder();
+      // read cluster once and do verification
+      ClusterDataCache cache = new ClusterDataCache();
+      cache.refresh(accessor);
+
+      Map<String, IdealState> idealStates = cache.getIdealStates();
+      if (idealStates == null)  // || idealStates.isEmpty())
+      {
+        // ideal state is null because ideal state is dropped
+        idealStates = Collections.emptyMap();
+      }
+
+      Map<String, ExternalView> extViews =
+          accessor.getChildValuesMap(keyBuilder.externalViews());
+      if (extViews == null) // || extViews.isEmpty())
+      {
+        extViews = Collections.emptyMap();
+      }
+
+      // if externalView is not empty and idealState doesn't exist
+      //  add empty idealState for the resource
+      for (String resource : extViews.keySet())
+      {
+        if (!idealStates.containsKey(resource))
+        {
+          idealStates.put(resource, new IdealState(resource));
+        }
+      }
+      
+      // calculate best possible state
+      BestPossibleStateOutput bestPossOutput =
+          ClusterStateVerifier.calcBestPossState(cache);
+
+      // set error states
+      if (errStates != null)
+      {
+        for (String resourceName : errStates.keySet())
+        {
+          Map<String, String> partErrStates = errStates.get(resourceName);
+          for (String partitionName : partErrStates.keySet())
+          {
+            String instanceName = partErrStates.get(partitionName);
+            Map<String, String> partStateMap =
+                bestPossOutput.getInstanceStateMap(resourceName,
+                                                   new Partition(partitionName));
+            partStateMap.put(instanceName, "ERROR");
+          }
+        }
+      }
+
+      
+      for (String resourceName : idealStates.keySet())
+      {
+        ExternalView extView = extViews.get(resourceName);
+        if (extView == null)
+        {
+          LOG.info("externalView for " + resourceName + " is not available");
+          return false;
+        }
+
+        // step 0: remove empty map and DROPPED state from best possible state
+        Map<Partition, Map<String, String>> bpStateMap =
+            bestPossOutput.getResourceMap(resourceName);
+        Iterator<Entry<Partition, Map<String, String>>> iter =
+            bpStateMap.entrySet().iterator();
+        while (iter.hasNext())
+        {
+          Map.Entry<Partition, Map<String, String>> entry = iter.next();
+          Map<String, String> instanceStateMap = entry.getValue();
+          if (instanceStateMap.isEmpty())
+          {
+            iter.remove();
+          } else
+          {
+            // remove instances with DROPPED state
+            Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+            while (insIter.hasNext())
+            {
+              Map.Entry<String, String> insEntry = insIter.next();
+              String state = insEntry.getValue();
+              if (state.equalsIgnoreCase("DROPPED"))
+              {
+                insIter.remove();
+              }   
+            }
+          }
+        }
+
+        // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
+
+        // step 1: externalView and bestPossibleState has equal size
+        int extViewSize = extView.getRecord().getMapFields().size();
+        int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+        if (extViewSize != bestPossStateSize)
+        {
+          LOG.info("exterView size (" + extViewSize
+              + ") is different from bestPossState size (" + bestPossStateSize
+              + ") for resource: " + resourceName);
+          // System.out.println("extView: " + extView.getRecord().getMapFields());
+          // System.out.println("bestPossState: " +
+          // bestPossOutput.getResourceMap(resourceName));
+          return false;
+        }
+
+        // step 2: every entry in external view is contained in best possible state
+        for (String partition : extView.getRecord().getMapFields().keySet())
+        {
+          Map<String, String> evInstanceStateMap =
+              extView.getRecord().getMapField(partition);
+          Map<String, String> bpInstanceStateMap =
+              bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+
+          boolean result =
+              ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
+                                                               bpInstanceStateMap);
+          if (result == false)
+          {
+            LOG.info("externalView is different from bestPossibleState for partition:"
+                + partition);
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+    catch (Exception e)
+    {
+      LOG.error("exception in verification", e);
+      return false;
+    }
+
+  }
+
+  static boolean verifyMasterNbInExtView(HelixDataAccessor accessor)
+  {
+    Builder keyBuilder = accessor.keyBuilder();
+
+    Map<String, IdealState> idealStates =
+        accessor.getChildValuesMap(keyBuilder.idealStates());
+    if (idealStates == null || idealStates.size() == 0)
+    {
+      LOG.info("No resource idealState");
+      return true;
+    }
+
+    Map<String, ExternalView> extViews =
+        accessor.getChildValuesMap(keyBuilder.externalViews());
+    if (extViews == null || extViews.size() < idealStates.size())
+    {
+      LOG.info("No externalViews | externalView.size() < idealState.size()");
+      return false;
+    }
+
+    for (String resource : extViews.keySet())
+    {
+      int partitions = idealStates.get(resource).getNumPartitions();
+      Map<String, Map<String, String>> instanceStateMap =
+          extViews.get(resource).getRecord().getMapFields();
+      if (instanceStateMap.size() < partitions)
+      {
+        LOG.info("Number of externalViews (" + instanceStateMap.size()
+            + ") < partitions (" + partitions + ")");
+        return false;
+      }
+
+      for (String partition : instanceStateMap.keySet())
+      {
+        boolean foundMaster = false;
+        for (String instance : instanceStateMap.get(partition).keySet())
+        {
+          if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER"))
+          {
+            foundMaster = true;
+            break;
+          }
+        }
+        if (!foundMaster)
+        {
+          LOG.info("No MASTER for partition: " + partition);
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  static void runStage(ClusterEvent event, Stage stage) throws Exception
+  {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    stage.process(event);
+    stage.postProcess();
+  }
+
+  /**
+   * calculate the best possible state note that DROPPED states are not checked since when
+   * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+   * 
+   * @param cache
+   * @return
+   * @throws Exception
+   */
+
+  static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception
+  {
+    ClusterEvent event = new ClusterEvent("sampleEvent");
+    event.addAttribute("ClusterDataCache", cache);
+
+    ResourceComputationStage rcState = new ResourceComputationStage();
+    CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+    BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+
+    runStage(event, rcState);
+    runStage(event, csStage);
+    runStage(event, bpStage);
+
+    BestPossibleStateOutput output =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+
+    // System.out.println("output:" + output);
+    return output;
+  }
+
+  public static <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2)
+  {
+    boolean isEqual = true;
+    if (map1 == null && map2 == null)
+    {
+      // OK
+    }
+    else if (map1 == null && map2 != null)
+    {
+      if (!map2.isEmpty())
+      {
+        isEqual = false;
+      }
+    }
+    else if (map1 != null && map2 == null)
+    {
+      if (!map1.isEmpty())
+      {
+        isEqual = false;
+      }
+    }
+    else
+    {
+      // verify size
+      if (map1.size() != map2.size())
+      {
+        isEqual = false;
+      }
+      // verify each <key, value> in map1 is contained in map2
+      for (K key : map1.keySet())
+      {
+        if (!map1.get(key).equals(map2.get(key)))
+        {
+          LOG.debug("different value for key: " + key + "(map1: " + map1.get(key)
+              + ", map2: " + map2.get(key) + ")");
+          isEqual = false;
+          break;
+        }
+      }
+    }
+    return isEqual;
+  }
+
+  public static boolean verifyByPolling(Verifier verifier)
+  {
+    return verifyByPolling(verifier, 30 * 1000);
+  }
+
+  public static boolean verifyByPolling(Verifier verifier, long timeout)
+  {
+    return verifyByPolling(verifier, timeout, 1000);
+  }
+
+  public static boolean verifyByPolling(Verifier verifier, long timeout, long period)
+  {
+    long startTime = System.currentTimeMillis();
+    boolean result = false;
+    try
+    {
+      long curTime;
+      do
+      {
+        Thread.sleep(period);
+        result = verifier.verify();
+        if (result == true)
+        {
+          break;
+        }
+        curTime = System.currentTimeMillis();
+      }
+      while (curTime <= startTime + timeout);
+      return result;
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    finally
+    {
+      long endTime = System.currentTimeMillis();
+
+      // debug
+      System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
+          + "ms to verify");
+
+    }
+    return false;
+  }
+
+  public static boolean verifyByZkCallback(ZkVerifier verifier)
+  {
+    return verifyByZkCallback(verifier, 30000);
+  }
+
+  public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout)
+  {
+    long startTime = System.currentTimeMillis();
+    CountDownLatch countDown = new CountDownLatch(1);
+    ZkClient zkClient = verifier.getZkClient();
+    String clusterName = verifier.getClusterName();
+
+    // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
+    // so when analyze zk log, we know when a test ends
+    zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+
+    ExtViewVeriferZkListener listener =
+        new ExtViewVeriferZkListener(countDown, zkClient, verifier);
+
+    String extViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+    zkClient.subscribeChildChanges(extViewPath, listener);
+    for (String child : zkClient.getChildren(extViewPath))
+    {
+      String childPath =
+          extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+      zkClient.subscribeDataChanges(childPath, listener);
+    }
+
+    // do initial verify
+    boolean result = verifier.verify();
+    if (result == false)
+    {
+      try
+      {
+        result = countDown.await(timeout, TimeUnit.MILLISECONDS);
+        if (result == false)
+        {
+          // make a final try if timeout
+          result = verifier.verify();
+        }
+      }
+      catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    // clean up
+    zkClient.unsubscribeChildChanges(extViewPath, listener);
+    for (String child : zkClient.getChildren(extViewPath))
+    {
+      String childPath =
+          extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+      zkClient.unsubscribeDataChanges(childPath, listener);
+    }
+
+    long endTime = System.currentTimeMillis();
+
+    zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+    // debug
+    System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
+
+    return result;
+  }
+
+  public static boolean verifyFileBasedClusterStates(String file,
+                                                     String instanceName,
+                                                     StateModelFactory<StateModel> stateModelFactory)
+  {
+    ClusterView clusterView = ClusterViewSerializer.deserialize(new File(file));
+    boolean ret = true;
+    int nonOfflineStateNr = 0;
+
+    // ideal_state for instance with name $instanceName
+    Map<String, String> instanceIdealStates = new HashMap<String, String>();
+    for (ZNRecord idealStateItem : clusterView.getPropertyList(PropertyType.IDEALSTATES))
+    {
+      Map<String, Map<String, String>> idealStates = idealStateItem.getMapFields();
+
+      for (Map.Entry<String, Map<String, String>> entry : idealStates.entrySet())
+      {
+        if (entry.getValue().containsKey(instanceName))
+        {
+          String state = entry.getValue().get(instanceName);
+          instanceIdealStates.put(entry.getKey(), state);
+        }
+      }
+    }
+
+    Map<String, StateModel> currentStateMap = stateModelFactory.getStateModelMap();
+
+    if (currentStateMap.size() != instanceIdealStates.size())
+    {
+      LOG.warn("Number of current states (" + currentStateMap.size() + ") mismatch "
+          + "number of ideal states (" + instanceIdealStates.size() + ")");
+      return false;
+    }
+
+    for (Map.Entry<String, String> entry : instanceIdealStates.entrySet())
+    {
+
+      String stateUnitKey = entry.getKey();
+      String idealState = entry.getValue();
+
+      if (!idealState.equalsIgnoreCase("offline"))
+      {
+        nonOfflineStateNr++;
+      }
+
+      if (!currentStateMap.containsKey(stateUnitKey))
+      {
+        LOG.warn("Current state does not contain " + stateUnitKey);
+        // return false;
+        ret = false;
+        continue;
+      }
+
+      String curState = currentStateMap.get(stateUnitKey).getCurrentState();
+      if (!idealState.equalsIgnoreCase(curState))
+      {
+        LOG.info("State mismatch--unit_key:" + stateUnitKey + " cur:" + curState
+            + " ideal:" + idealState + " instance_name:" + instanceName);
+        // return false;
+        ret = false;
+        continue;
+      }
+    }
+
+    if (ret == true)
+    {
+      System.out.println(instanceName + ": verification succeed");
+      LOG.info(instanceName + ": verification succeed (" + nonOfflineStateNr + " states)");
+    }
+
+    return ret;
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption =
+        OptionBuilder.withLongOpt(help)
+                     .withDescription("Prints command-line options info")
+                     .create();
+
+    Option zkServerOption =
+        OptionBuilder.withLongOpt(zkServerAddress)
+                     .withDescription("Provide zookeeper address")
+                     .create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption =
+        OptionBuilder.withLongOpt(cluster)
+                     .withDescription("Provide cluster name")
+                     .create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option timeoutOption =
+        OptionBuilder.withLongOpt(timeout)
+                     .withDescription("Timeout value for verification")
+                     .create();
+    timeoutOption.setArgs(1);
+    timeoutOption.setArgName("Timeout value (Optional), default=30s");
+
+    Option sleepIntervalOption =
+        OptionBuilder.withLongOpt(period)
+                     .withDescription("Polling period for verification")
+                     .create();
+    sleepIntervalOption.setArgs(1);
+    sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(timeoutOption);
+    options.addOption(sleepIntervalOption);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs)
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    // CommandLine cmd = null;
+
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    }
+    catch (ParseException pe)
+    {
+      System.err.println("CommandLineClient: failed to parse command-line options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static boolean verifyState(String[] args)
+  {
+    // TODO Auto-generated method stub
+    String clusterName = "storage-cluster";
+    String zkServer = "localhost:2181";
+    long timeoutValue = 0;
+    long periodValue = 1000;
+
+    if (args.length > 0)
+    {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkServer = cmd.getOptionValue(zkServerAddress);
+      clusterName = cmd.getOptionValue(cluster);
+      String timeoutStr = cmd.getOptionValue(timeout);
+      String periodStr = cmd.getOptionValue(period);
+      if (timeoutStr != null)
+      {
+        try
+        {
+          timeoutValue = Long.parseLong(timeoutStr);
+        }
+        catch (Exception e)
+        {
+          System.err.println("Exception in converting " + timeoutStr
+              + " to long. Use default (0)");
+        }
+      }
+
+      if (periodStr != null)
+      {
+        try
+        {
+          periodValue = Long.parseLong(periodStr);
+        }
+        catch (Exception e)
+        {
+          System.err.println("Exception in converting " + periodStr
+              + " to long. Use default (1000)");
+        }
+      }
+
+    }
+    // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
+    // timeoutValue,
+    // periodValue);
+
+    return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
+                              timeoutValue);
+  }
+
+  public static void main(String[] args)
+  {
+    boolean result = verifyState(args);
+    System.out.println(result ? "Successful" : "failed");
+    System.exit(1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java
new file mode 100644
index 0000000..c10da85
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java
@@ -0,0 +1,177 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.ClusterView;
+import org.apache.helix.manager.file.StaticFileHelixManager;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ClusterViewSerializer
+{
+  private static Logger logger = Logger.getLogger(ClusterViewSerializer.class);
+
+  public static void serialize(ClusterView view, File file)
+  {
+    ObjectMapper mapper = new ObjectMapper();
+
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    // serializationConfig.set(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, true);
+
+    try
+    {
+      mapper.writeValue(file, view);
+    } 
+    catch (Exception e)
+    {
+      logger.error("Error during serialization of data:" + view, e);
+    }
+  }
+  
+  public static byte[] serialize(ClusterView view)
+  {
+    ObjectMapper mapper = new ObjectMapper();
+
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    // serializationConfig.set(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, true);
+
+    StringWriter sw = new StringWriter();
+
+    try
+    {
+      mapper.writeValue(sw, view);
+      return sw.toString().getBytes();
+    } 
+    catch (Exception e)
+    {
+      logger.error("Error during serialization of data:" + view, e);
+    }
+
+    return new byte[0];
+  }
+
+  public static ClusterView deserialize(File file)
+  {
+    if (!file.exists())
+    {
+      logger.error(String.format("Static config file \"%s\" doesn't exist", file.getAbsolutePath()));
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+   
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    // deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+
+    try
+    {
+      ClusterView view = mapper.readValue(file, ClusterView.class);
+      return view;
+    } 
+    catch (Exception e)
+    {
+      logger.error("Error during deserialization of file:" + file.getAbsolutePath(), e);
+    }
+
+    return null;
+  }
+
+  
+  public static ClusterView deserialize(byte[] bytes)
+  {
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    // deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+
+    try
+    {
+      ClusterView view = mapper.readValue(bais, ClusterView.class);
+      return view;
+    } 
+    catch (Exception e)
+    {
+      logger.error("Error during deserialization of bytes:" + new String(bytes), e);
+    }
+
+    return null;
+  }
+
+  public static void main(String[] args) throws JsonGenerationException,
+      JsonMappingException, IOException
+  {
+    // temporary test only
+    // create fake db names and nodes
+    List<StaticFileHelixManager.DBParam> dbParams = new ArrayList<StaticFileHelixManager.DBParam>();
+    // dbParams.add(new FileBasedClusterManager.DBParam("BizFollow", 1));
+    dbParams.add(new StaticFileHelixManager.DBParam("BizProfile_qatest218a", 128));
+    // dbParams.add(new FileBasedClusterManager.DBParam("EspressoDB", 10));
+    // dbParams.add(new FileBasedClusterManager.DBParam("MailboxDB", 128));
+    // dbParams.add(new FileBasedClusterManager.DBParam("MyDB", 8));
+    // dbParams.add(new FileBasedClusterManager.DBParam("schemata", 1));
+    String[] nodesInfo = { "localhost:8900", "localhost:8901",
+                           "localhost:8902", "localhost:8903",
+                           "localhost:8904" };
+    // String[] nodesInfo = { "esv4-app75.stg.linkedin.com:12918" };
+    int replica = 0;
+
+    ClusterView view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, replica);
+    String file = "/tmp/cluster-view-bizprofile.json";
+    // ClusterViewSerializer serializer = new ClusterViewSerializer(file);
+
+    byte[] bytes = ClusterViewSerializer.serialize(view);
+    // logger.info("serialized bytes=" );
+    // logger.info(new String(bytes));
+    System.out.println("serialized bytes=");
+    System.out.println(new String(bytes));
+
+    ClusterView restoredView = ClusterViewSerializer.deserialize(bytes);
+    // logger.info(restoredView);
+
+    bytes = ClusterViewSerializer.serialize(restoredView);
+    // logger.info(new String(bytes));
+    System.out.println("restored cluster view=");
+    System.out.println(new String(bytes));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java b/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java
new file mode 100644
index 0000000..9c202d4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java
@@ -0,0 +1,626 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+public class IdealCalculatorByConsistentHashing
+{
+  /**
+   * Interface to calculate the hash function value of a string
+   */
+  public interface HashFunction
+  {
+    public int getHashValue(String key);
+  }
+
+  /**
+   * The default string hash function. Same as the default function used by
+   * Voldmort
+   */
+  public static class FnvHash implements HashFunction
+  {
+    private static final long FNV_BASIS = 0x811c9dc5;
+    private static final long FNV_PRIME = (1 << 24) + 0x193;
+    public static final long FNV_BASIS_64 = 0xCBF29CE484222325L;
+    public static final long FNV_PRIME_64 = 1099511628211L;
+
+    public int hash(byte[] key)
+    {
+      long hash = FNV_BASIS;
+      for (int i = 0; i < key.length; i++)
+      {
+        hash ^= 0xFF & key[i];
+        hash *= FNV_PRIME;
+      }
+      return (int) hash;
+    }
+
+    public long hash64(long val)
+    {
+      long hashval = FNV_BASIS_64;
+      for (int i = 0; i < 8; i++)
+      {
+        long octet = val & 0x00ff;
+        val = val >> 8;
+        hashval = hashval ^ octet;
+        hashval = hashval * FNV_PRIME_64;
+      }
+      return Math.abs(hashval);
+    }
+
+    @Override
+    public int getHashValue(String key)
+    {
+      return hash(key.getBytes());
+    }
+
+  }
+
+  /**
+   * Calculate the ideal state for list of instances clusters using consistent
+   * hashing.
+   *
+   * @param instanceNames
+   *          List of instance names.
+   * @param partitions
+   *          the partition number of the database
+   * @param replicas
+   *          the replication degree
+   * @param dbName
+   *          the name of the database
+   * @return The ZNRecord that contains the ideal state
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames,
+      int partitions, int replicas, String dbName, HashFunction hashFunc)
+  {
+    return calculateIdealState(instanceNames, partitions, replicas, dbName,
+        hashFunc, 65536);
+  }
+
+  /**
+   * Calculate the ideal state for list of instances clusters using consistent
+   * hashing.
+   *
+   * @param instanceNames
+   *          List of instance names.
+   * @param partitions
+   *          the partition number of the database
+   * @param replicas
+   *          the replication degree
+   * @param dbName
+   *          the name of the database
+   * @param hashringSize
+   *          the size of the hash ring used by consistent hashing
+   * @return The ZNRecord that contains the ideal state
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames,
+      int partitions, int replicas, String dbName, HashFunction hashFunc,
+      int hashRingSize)
+  {
+    ZNRecord result = new ZNRecord(dbName);
+
+    int[] hashRing = generateEvenHashRing(instanceNames, hashRingSize);
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+    Random rand = new Random(0xc0ffee);
+    for (int i = 0; i < partitions; i++)
+    {
+      String partitionName = dbName + ".partition-" + i;
+      int hashPos = rand.nextInt() % hashRingSize;
+      // (int)(hashFunc.getHashValue(partitionName) % hashRingSize);
+      hashPos = hashPos < 0 ? (hashPos + hashRingSize) : hashPos;
+      // System.out.print(hashPos+ " ");
+      // if(i % 120 == 0) System.out.println();
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+      // the first in the list is the node that contains the master
+      int masterPos = hashRing[hashPos];
+      partitionAssignment.put(instanceNames.get(masterPos), "MASTER");
+
+      // partitionAssignment.put("hash", "" + hashPos + " " + masterPos);
+
+      // Put slaves in next has ring positions. We need to make sure that no
+      // more than 2 slaves
+      // are mapped to one node.
+      for (int j = 1; j <= replicas; j++)
+      {
+        String next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
+        while (partitionAssignment.containsKey(next))
+        {
+          hashPos++;
+          next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
+        }
+        partitionAssignment.put(next, "SLAVE");
+      }
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    return result;
+  }
+
+  /**
+   * Generate the has ring for consistent hashing.
+   *
+   * @param instanceNames
+   *          List of instance names.
+   * @param hashringSize
+   *          the size of the hash ring used by consistent hashing
+   * @return The int array as the hashing. it contains random values ranges from
+   *         0..size of instanceNames-1
+   */
+  public static int[] generateHashRing(List<String> instanceNames,
+      int hashRingSize)
+  {
+    int[] result = new int[hashRingSize];
+    for (int i = 0; i < result.length; i++)
+    {
+      result[i] = 0;
+    }
+    int instances = instanceNames.size();
+    // The following code generates the random distribution
+    for (int i = 1; i < instances; i++)
+    {
+      putNodeOnHashring(result, i, hashRingSize / (i + 1), i);
+    }
+    return result;
+  }
+
+  public static int[] generateEvenHashRing(List<String> instanceNames,
+      int hashRingSize)
+  {
+    int[] result = new int[hashRingSize];
+    for (int i = 0; i < result.length; i++)
+    {
+      result[i] = 0;
+    }
+    int instances = instanceNames.size();
+    // The following code generates the random distribution
+    for (int i = 1; i < instances; i++)
+    {
+      putNodeEvenOnHashRing(result, i, i + 1);
+    }
+    return result;
+  }
+
+  private static void putNodeEvenOnHashRing(int[] hashRing, int nodeVal,
+      int totalValues)
+  {
+    int newValNum = hashRing.length / totalValues;
+    assert (newValNum > 0);
+    Map<Integer, List<Integer>> valueIndex = buildValueIndex(hashRing);
+    int nSources = valueIndex.size();
+    int remainder = newValNum % nSources;
+
+    List<List<Integer>> positionLists = new ArrayList<List<Integer>>();
+    for (List<Integer> list : valueIndex.values())
+    {
+      positionLists.add(list);
+    }
+    class ListComparator implements Comparator<List<Integer>>
+    {
+      @Override
+      public int compare(List<Integer> o1, List<Integer> o2)
+      {
+        return (o1.size() > o2.size() ? -1 : (o1.size() == o2.size() ? 0 : 1));
+      }
+    }
+    Collections.sort(positionLists, new ListComparator());
+
+    for (List<Integer> oldValPositions : positionLists)
+    {
+      // List<Integer> oldValPositions = valueIndex.get(oldVal);
+      int nValsToReplace = newValNum / nSources;
+      assert (nValsToReplace > 0);
+      if (remainder > 0)
+      {
+        nValsToReplace++;
+        remainder--;
+      }
+      // System.out.print(oldValPositions.size()+" "+nValsToReplace+"  ");
+      putNodeValueOnHashRing(hashRing, nodeVal, nValsToReplace, oldValPositions);
+      // randomly take nValsToReplace positions in oldValPositions and make them
+    }
+    // System.out.println();
+  }
+
+  private static void putNodeValueOnHashRing(int[] hashRing, int nodeVal,
+      int numberOfValues, List<Integer> positions)
+  {
+    Random rand = new Random(nodeVal);
+    // initialize the index array
+    int[] index = new int[positions.size()];
+    for (int i = 0; i < index.length; i++)
+    {
+      index[i] = i;
+    }
+
+    int nodesLeft = index.length;
+
+    for (int i = 0; i < numberOfValues; i++)
+    {
+      // Calculate a random index
+      int randIndex = rand.nextInt() % nodesLeft;
+      if (randIndex < 0)
+      {
+        randIndex += nodesLeft;
+      }
+      hashRing[positions.get(index[randIndex])] = nodeVal;
+
+      // swap the random index and the last available index, and decrease the
+      // nodes left
+      int temp = index[randIndex];
+      index[randIndex] = index[nodesLeft - 1];
+      index[nodesLeft - 1] = temp;
+      nodesLeft--;
+    }
+  }
+
+  private static Map<Integer, List<Integer>> buildValueIndex(int[] hashRing)
+  {
+    Map<Integer, List<Integer>> result = new TreeMap<Integer, List<Integer>>();
+    for (int i = 0; i < hashRing.length; i++)
+    {
+      if (!result.containsKey(hashRing[i]))
+      {
+        List<Integer> list = new ArrayList<Integer>();
+        result.put(hashRing[i], list);
+      }
+      result.get(hashRing[i]).add(i);
+    }
+    return result;
+  }
+
+  /**
+   * Uniformly put node values on the hash ring. Derived from the shuffling
+   * algorithm
+   *
+   * @param result
+   *          the hash ring array.
+   * @param nodeValue
+   *          the int value to be added to the hash ring this time
+   * @param numberOfNodes
+   *          number of node values to put on the hash ring array
+   * @param randomSeed
+   *          the random seed
+   */
+  public static void putNodeOnHashring(int[] result, int nodeValue,
+      int numberOfNodes, int randomSeed)
+  {
+    Random rand = new Random(randomSeed);
+    // initialize the index array
+    int[] index = new int[result.length];
+    for (int i = 0; i < index.length; i++)
+    {
+      index[i] = i;
+    }
+
+    int nodesLeft = index.length;
+
+    for (int i = 0; i < numberOfNodes; i++)
+    {
+      // Calculate a random index
+      int randIndex = rand.nextInt() % nodesLeft;
+      if (randIndex < 0)
+      {
+        randIndex += nodesLeft;
+      }
+      if (result[index[randIndex]] == nodeValue)
+      {
+        assert (false);
+      }
+      result[index[randIndex]] = nodeValue;
+
+      // swap the random index and the last available index, and decrease the
+      // nodes left
+      int temp = index[randIndex];
+      index[randIndex] = index[nodesLeft - 1];
+      index[nodesLeft - 1] = temp;
+
+      nodesLeft--;
+    }
+  }
+
+  /**
+   * Helper function to see how many partitions are mapped to different
+   * instances in two ideal states
+   * */
+  public static void printDiff(ZNRecord record1, ZNRecord record2)
+  {
+    int diffCount = 0;
+    for (String key : record1.getMapFields().keySet())
+    {
+      Map<String, String> map1 = record1.getMapField(key);
+      Map<String, String> map2 = record2.getMapField(key);
+
+      for (String k : map1.keySet())
+      {
+        if (!map2.containsKey(k))
+        {
+          diffCount++;
+        } else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
+        {
+          diffCount++;
+        }
+      }
+    }
+    System.out.println("diff count = " + diffCount);
+  }
+
+  /**
+   * Helper function to compare the difference between two hashing buffers
+   * */
+  public static void compareHashrings(int[] ring1, int[] ring2)
+  {
+    int diff = 0;
+    for (int i = 0; i < ring1.length; i++)
+    {
+      if (ring1[i] != ring2[i])
+      {
+        diff++;
+      }
+    }
+    System.out.println("ring diff: " + diff);
+  }
+
+  public static void printNodeOfflineOverhead(ZNRecord record)
+  {
+    // build node -> partition map
+    Map<String, Set<String>> nodeNextMap = new TreeMap<String, Set<String>>();
+    for (String partitionName : record.getMapFields().keySet())
+    {
+      Map<String, String> map1 = record.getMapField(partitionName);
+      String master = "", slave = "";
+      for (String nodeName : map1.keySet())
+      {
+        if (!nodeNextMap.containsKey(nodeName))
+        {
+          nodeNextMap.put(nodeName, new TreeSet<String>());
+        }
+
+        // String master = "", slave = "";
+        if (map1.get(nodeName).equalsIgnoreCase("MASTER"))
+        {
+          master = nodeName;
+        } else
+        {
+          if (slave.equalsIgnoreCase(""))
+          {
+            slave = nodeName;
+          }
+        }
+
+      }
+      nodeNextMap.get(master).add(slave);
+    }
+    System.out.println("next count: ");
+    for (String key : nodeNextMap.keySet())
+    {
+      System.out.println(nodeNextMap.get(key).size() + " ");
+    }
+    System.out.println();
+  }
+
+  /**
+   * Helper function to calculate and print the standard deviation of the
+   * partition assignment ideal state, also the min/max of master partitions
+   * that is hosted on each node
+   * */
+  public static void printIdealStateStats(ZNRecord record, String value)
+  {
+    Map<String, Integer> countsMap = new TreeMap<String, Integer>();
+    for (String key : record.getMapFields().keySet())
+    {
+      Map<String, String> map1 = record.getMapField(key);
+      for (String k : map1.keySet())
+      {
+        if (!countsMap.containsKey(k))
+        {
+          countsMap.put(k, new Integer(0));//
+        }
+        if (value.equals("") || map1.get(k).equalsIgnoreCase(value))
+        {
+          countsMap.put(k, countsMap.get(k).intValue() + 1);
+        }
+      }
+    }
+    double sum = 0;
+    int maxCount = 0;
+    int minCount = Integer.MAX_VALUE;
+
+    System.out.println("Partition distributions: ");
+    for (String k : countsMap.keySet())
+    {
+      int count = countsMap.get(k);
+      sum += count;
+      if (maxCount < count)
+      {
+        maxCount = count;
+      }
+      if (minCount > count)
+      {
+        minCount = count;
+      }
+      System.out.print(count + " ");
+    }
+    System.out.println();
+    double mean = sum / (countsMap.size());
+    // calculate the deviation of the node distribution
+    double deviation = 0;
+    for (String k : countsMap.keySet())
+    {
+      double count = countsMap.get(k);
+      deviation += (count - mean) * (count - mean);
+    }
+    System.out.println("Mean: " + mean + " normal deviation:"
+        + Math.sqrt(deviation / countsMap.size()));
+
+    System.out.println("Max count: " + maxCount + " min count:" + minCount);
+    /*
+     * int steps = 10; int stepLen = (maxCount - minCount)/steps; List<Integer>
+     * histogram = new ArrayList<Integer>((maxCount - minCount)/stepLen + 1);
+     *
+     * for(int i = 0; i< (maxCount - minCount)/stepLen + 1; i++) {
+     * histogram.add(0); } for(String k :countsMap.keySet()) { int count =
+     * countsMap.get(k); int stepNo = (count - minCount)/stepLen;
+     * histogram.set(stepNo, histogram.get(stepNo) +1); }
+     * System.out.println("histogram:"); for(Integer x : histogram) {
+     * System.out.print(x+" "); }
+     */
+  }
+
+  public static void printHashRingStat(int[] hashRing)
+  {
+    double sum = 0, mean = 0, deviation = 0;
+    Map<Integer, Integer> countsMap = new TreeMap<Integer, Integer>();
+    for (int i = 0; i < hashRing.length; i++)
+    {
+      if (!countsMap.containsKey(hashRing[i]))
+      {
+        countsMap.put(hashRing[i], new Integer(0));//
+      }
+      countsMap.put(hashRing[i], countsMap.get(hashRing[i]).intValue() + 1);
+    }
+    int maxCount = Integer.MIN_VALUE;
+    int minCount = Integer.MAX_VALUE;
+    for (Integer k : countsMap.keySet())
+    {
+      int count = countsMap.get(k);
+      sum += count;
+      if (maxCount < count)
+      {
+        maxCount = count;
+      }
+      if (minCount > count)
+      {
+        minCount = count;
+      }
+    }
+    mean = sum / countsMap.size();
+    for (Integer k : countsMap.keySet())
+    {
+      int count = countsMap.get(k);
+      deviation += (count - mean) * (count - mean);
+    }
+    System.out.println("hashring Mean: " + mean + " normal deviation:"
+        + Math.sqrt(deviation / countsMap.size()));
+
+  }
+
+  static int[] getFnvHashArray(List<String> strings)
+  {
+    int[] result = new int[strings.size()];
+    int i = 0;
+    IdealCalculatorByConsistentHashing.FnvHash hashfunc = new IdealCalculatorByConsistentHashing.FnvHash();
+    for (String s : strings)
+    {
+      int val = hashfunc.getHashValue(s) % 65536;
+      if (val < 0)
+        val += 65536;
+      result[i++] = val;
+    }
+    return result;
+  }
+
+  static void printArrayStat(int[] vals)
+  {
+    double sum = 0, mean = 0, deviation = 0;
+
+    for (int i = 0; i < vals.length; i++)
+    {
+      sum += vals[i];
+    }
+    mean = sum / vals.length;
+    for (int i = 0; i < vals.length; i++)
+    {
+      deviation += (mean - vals[i]) * (mean - vals[i]);
+    }
+    System.out.println("normalied deviation: "
+        + Math.sqrt(deviation / vals.length) / mean);
+  }
+
+  public static void main(String args[]) throws Exception
+  {
+    // Test the hash ring generation
+    List<String> instanceNames = new ArrayList<String>();
+    for (int i = 0; i < 10; i++)
+    {
+      instanceNames.add("localhost_123" + i);
+    }
+
+    // int[] ring1 =
+    // IdealCalculatorByConsistentHashing.generateEvenHashRing(instanceNames,
+    // 65535);
+    // printHashRingStat(ring1);
+    // int[] ring1 = getFnvHashArray(instanceNames);
+    // printArrayStat(ring1);
+
+    int partitions = 200, replicas = 2;
+    String dbName = "espressoDB1";
+
+    ZNRecord result = IdealCalculatorByConsistentHashing.calculateIdealState(
+        instanceNames, partitions, replicas, dbName,
+        new IdealCalculatorByConsistentHashing.FnvHash());
+    System.out.println("\nMaster :");
+    printIdealStateStats(result, "MASTER");
+
+    System.out.println("\nSlave :");
+    printIdealStateStats(result, "SLAVE");
+
+    System.out.println("\nTotal :");
+    printIdealStateStats(result, "");
+
+    printNodeOfflineOverhead(result);
+    /*
+     * ZNRecordSerializer serializer = new ZNRecordSerializer(); byte[] bytes;
+     * bytes = serializer.serialize(result); // System.out.println(new
+     * String(bytes));
+     *
+     * List<String> instanceNames2 = new ArrayList<String>(); for(int i = 0;i <
+     * 40; i++) { instanceNames2.add("localhost_123"+i); }
+     *
+     * ZNRecord result2 =
+     * IdealCalculatorByConsistentHashing.calculateIdealState( instanceNames2,
+     * partitions, replicas, dbName, new
+     * IdealCalculatorByConsistentHashing.FnvHash());
+     *
+     * printDiff(result, result2);
+     *
+     * //IdealCalculatorByConsistentHashing.printIdealStateStats(result2);
+     *
+     *
+     *
+     * int[] ring2 =
+     * IdealCalculatorByConsistentHashing.generateHashRing(instanceNames2,
+     * 30000);
+     *
+     * IdealCalculatorByConsistentHashing.compareHashrings(ring1, ring2);
+     * //printNodeStats(result); //printNodeStats(result2); bytes =
+     * serializer.serialize(result2); printHashRingStat(ring2); //
+     * System.out.println(new String(bytes));
+     */
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java
new file mode 100644
index 0000000..d1c7170
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java
@@ -0,0 +1,326 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+public class IdealStateCalculatorByRush
+{
+  /**
+   * Build the config map for RUSH algorithm. The input of RUSH algorithm groups
+   * nodes into "cluster"s, and different clusters can be assigned with
+   * different weights.
+   *
+   * @param numClusters
+   *          number of node clusters
+   * @param instancesPerCluster
+   *          List of clusters, each contain a list of node name strings.
+   * @param replicationDegree
+   *          the replication degree
+   * @param clusterWeights
+   *          the weight for each node cluster
+   * @return this config map structure for RUSH algorithm.
+   */
+  static HashMap<String, Object> buildRushConfig(int numClusters,
+      List<List<String>> instancesPerCluster, int replicationDegree,
+      List<Integer> clusterWeights)
+  {
+    HashMap<String, Object> config = new HashMap<String, Object>();
+    config.put("replicationDegree", replicationDegree);
+
+    HashMap[] clusterList = new HashMap[numClusters];
+    config.put("subClusters", clusterList);
+
+    HashMap[] nodes;
+    HashMap<String, String> node;
+    HashMap<String, Object> clusterData;
+    for (int n = 0; n < numClusters; n++)
+    {
+      int numNodes = instancesPerCluster.get(n).size();
+      nodes = new HashMap[numNodes];
+      for (int i = 0; i < numNodes; i++)
+      {
+        node = new HashMap<String, String>();
+        node.put("partition", instancesPerCluster.get(n).get(i));
+        nodes[i] = node;
+      }
+      clusterData = new HashMap<String, Object>();
+      clusterData.put("weight", clusterWeights.get(n));
+      clusterData.put("nodes", nodes);
+      clusterList[n] = clusterData;
+    }
+    return config;
+  }
+
+  /**
+   * Calculate the ideal state for list of instances clusters.
+   *
+   * @param numClusters
+   *          number of node clusters
+   * @param instanceClusters
+   *          List of clusters, each contain a list of node name strings.
+   * @param instanceClusterWeights
+   *          the weight for each instance cluster
+   * @param partitions
+   *          the partition number of the database
+   * @param replicas
+   *          the replication degree
+   * @param dbName
+   *          the name of the database
+   * @return The ZNRecord that contains the ideal state
+   */
+  public static ZNRecord calculateIdealState(
+      List<List<String>> instanceClusters,
+      List<Integer> instanceClusterWeights, int partitions, int replicas,
+      String dbName) throws Exception
+  {
+    ZNRecord result = new ZNRecord(dbName);
+
+    int numberOfClusters = instanceClusters.size();
+    List<List<String>> nodesInClusters = instanceClusters;
+    List<Integer> clusterWeights = instanceClusterWeights;
+
+    HashMap<String, Object> rushConfig = buildRushConfig(numberOfClusters,
+        nodesInClusters, replicas + 1, clusterWeights);
+    RUSHrHash rushHash = new RUSHrHash(rushConfig);
+
+    Random r = new Random(0);
+    for (int i = 0; i < partitions; i++)
+    {
+      int partitionId = i;
+      String partitionName = dbName + ".partition-" + partitionId;
+
+      ArrayList<HashMap> partitionAssignmentResult = rushHash
+          .findNode(i);
+      List<String> nodeNames = new ArrayList<String>();
+      for (HashMap p : partitionAssignmentResult)
+      {
+        for (Object key : p.keySet())
+        {
+          if (p.get(key) instanceof String)
+          {
+            nodeNames.add(p.get(key).toString());
+          }
+        }
+      }
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+
+      for (int j = 0; j < nodeNames.size(); j++)
+      {
+        partitionAssignment.put(nodeNames.get(j), "SLAVE");
+      }
+      int master = r.nextInt(nodeNames.size());
+      //master = nodeNames.size()/2;
+      partitionAssignment.put(nodeNames.get(master), "MASTER");
+
+
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+    return result;
+  }
+
+  public static ZNRecord calculateIdealState(
+      List<String> instanceClusters,
+      int instanceClusterWeight, int partitions, int replicas,
+      String dbName) throws Exception
+  {
+    List<List<String>> instanceClustersList = new ArrayList<List<String>>();
+    instanceClustersList.add(instanceClusters);
+
+    List<Integer> instanceClusterWeightList = new ArrayList<Integer>();
+    instanceClusterWeightList.add(instanceClusterWeight);
+
+    return calculateIdealState(
+        instanceClustersList,
+        instanceClusterWeightList, partitions, replicas,
+        dbName);
+  }
+  /**
+   * Helper function to see how many partitions are mapped to different
+   * instances in two ideal states
+   * */
+  public static void printDiff(ZNRecord record1, ZNRecord record2)
+  {
+    int diffCount = 0;
+    int diffCountMaster = 0;
+    for (String key : record1.getMapFields().keySet())
+    {
+      Map<String, String> map1 = record1.getMapField(key);
+      Map<String, String> map2 = record2.getMapField(key);
+
+      for (String k : map1.keySet())
+      {
+        if (!map2.containsKey(k))
+        {
+          diffCount++;
+        }
+        else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
+        {
+          diffCountMaster++;
+        }
+      }
+    }
+    System.out.println("\ndiff count = " + diffCount);
+    System.out.println("\nmaster diff count:"+ diffCountMaster);
+  }
+
+  /**
+   * Helper function to calculate and print the standard deviation of the
+   * partition assignment ideal state.
+   * */
+  public static void printIdealStateStats(ZNRecord record)
+  {
+    Map<String, Integer> countsMap = new TreeMap<String, Integer>();
+    Map<String, Integer> masterCountsMap = new TreeMap<String, Integer>();
+    for (String key : record.getMapFields().keySet())
+    {
+      Map<String, String> map1 = record.getMapField(key);
+      for (String k : map1.keySet())
+      {
+        if (!countsMap.containsKey(k))
+        {
+          countsMap.put(k, new Integer(0));
+        }
+        else
+        {
+          countsMap.put(k, countsMap.get(k).intValue() + 1);
+        }
+        if (!masterCountsMap.containsKey(k))
+        {
+          masterCountsMap.put(k, new Integer(0));
+
+        }
+        else if (map1.get(k).equalsIgnoreCase("MASTER"))
+        {
+          masterCountsMap.put(k, masterCountsMap.get(k).intValue() + 1);
+        }
+      }
+    }
+    double sum = 0;
+    int maxCount = 0;
+    int minCount = Integer.MAX_VALUE;
+    for (String k : countsMap.keySet())
+    {
+      int count = countsMap.get(k);
+      sum += count;
+      if (maxCount < count)
+      {
+        maxCount = count;
+      }
+      if (minCount > count)
+      {
+        minCount = count;
+      }
+      System.out.print(count + " ");
+    }
+    System.out.println("\nMax count: " + maxCount + " min count:" + minCount);
+    System.out.println("\n master:");
+    double sumMaster = 0;
+    int maxCountMaster = 0;
+    int minCountMaster = Integer.MAX_VALUE;
+    for (String k : masterCountsMap.keySet())
+    {
+      int count = masterCountsMap.get(k);
+      sumMaster += count;
+      if (maxCountMaster < count)
+      {
+        maxCountMaster = count;
+      }
+      if (minCountMaster > count)
+      {
+        minCountMaster = count;
+      }
+      System.out.print(count + " ");
+    }
+    System.out.println("\nMean master: "+ 1.0*sumMaster/countsMap.size());
+    System.out.println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
+    double mean = sum / (countsMap.size());
+    // calculate the deviation of the node distribution
+    double deviation = 0;
+    for (String k : countsMap.keySet())
+    {
+      double count = countsMap.get(k);
+      deviation += (count - mean) * (count - mean);
+    }
+    System.out.println("Mean: " + mean + " normal deviation:"
+        + Math.sqrt(deviation / countsMap.size()) / mean);
+
+    //System.out.println("Max count: " + maxCount + " min count:" + minCount);
+    int steps = 10;
+    int stepLen = (maxCount - minCount) / steps;
+    if(stepLen == 0) return;
+    List<Integer> histogram = new ArrayList<Integer>((maxCount - minCount)
+        / stepLen + 1);
+
+    for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++)
+    {
+      histogram.add(0);
+    }
+    for (String k : countsMap.keySet())
+    {
+      int count = countsMap.get(k);
+      int stepNo = (count - minCount) / stepLen;
+      histogram.set(stepNo, histogram.get(stepNo) + 1);
+    }
+    System.out.println("histogram:");
+    for (Integer x : histogram)
+    {
+      System.out.print(x + " ");
+    }
+  }
+
+  public static void main(String args[]) throws Exception
+  {
+    int partitions = 4096, replicas = 2;
+    String dbName = "espressoDB1";
+    List<String> instanceNames = new ArrayList<String>();
+    List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
+    for (int i = 0; i < 20; i++)
+    {
+      instanceNames.add("local"+i+"host_123" + i);
+    }
+    instanceCluster1.add(instanceNames);
+    List<Integer> weights1 = new ArrayList<Integer>();
+    weights1.add(1);
+    ZNRecord result = IdealStateCalculatorByRush.calculateIdealState(
+        instanceCluster1, weights1, partitions, replicas, dbName);
+
+    printIdealStateStats(result);
+
+    List<String> instanceNames2 = new ArrayList<String>();
+    for (int i = 400; i < 405; i++)
+    {
+      instanceNames2.add("localhost_123" + i);
+    }
+    instanceCluster1.add(instanceNames2);
+    weights1.add(1);
+    ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(
+        instanceCluster1, weights1, partitions, replicas, dbName);
+
+    printDiff(result, result2);
+    printIdealStateStats(result2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
new file mode 100644
index 0000000..39bd5d2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+/*
+ * Ideal state calculator for the cluster manager V1. The ideal state is
+ * calculated by randomly assign master partitions to storage nodes.
+ *
+ * Note that the following code is a native strategy and is for cluster manager V1 only. We will
+ * use the other algorithm to calculate the ideal state in future milestones.
+ *
+ *
+ * */
+
+public class IdealStateCalculatorByShuffling
+{
+  /*
+   * Given the number of nodes, partitions and replica number, calculate the
+   * ideal state in the following manner: For the master partition assignment,
+   * 1. construct Arraylist partitionList, with partitionList[i] = i; 2. Shuffle
+   * the partitions array 3. Scan the shuffled array, then assign
+   * partitionList[i] to node (i % nodes)
+   *
+   * for the slave partitions, simply put them in the node after the node that
+   * contains the master partition.
+   *
+   * The result of the method is a ZNRecord, which contains a list of maps; each
+   * map is from the name of nodes to either "MASTER" or "SLAVE".
+   */
+
+
+  public static ZNRecord calculateIdealState(List<String> instanceNames,
+      int partitions, int replicas, String dbName, long randomSeed)
+  {
+    return calculateIdealState(instanceNames, partitions, replicas, dbName, randomSeed, "MASTER", "SLAVE");
+  }
+  public static ZNRecord calculateIdealState(List<String> instanceNames,
+      int partitions, int replicas, String dbName, long randomSeed, String masterValue, String slaveValue)
+  {
+    if (instanceNames.size() <= replicas)
+    {
+      throw new IllegalArgumentException(
+          "Replicas must be less than number of storage nodes");
+    }
+
+    Collections.sort(instanceNames);
+
+    ZNRecord result = new ZNRecord(dbName);
+
+    List<Integer> partitionList = new ArrayList<Integer>(partitions);
+    for (int i = 0; i < partitions; i++)
+    {
+      partitionList.add(new Integer(i));
+    }
+    Random rand = new Random(randomSeed);
+    // Shuffle the partition list
+    Collections.shuffle(partitionList, rand);
+
+    for (int i = 0; i < partitionList.size(); i++)
+    {
+      int partitionId = partitionList.get(i);
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+      int masterNode = i % instanceNames.size();
+      // the first in the list is the node that contains the master
+      partitionAssignment.put(instanceNames.get(masterNode), masterValue);
+
+      // for the jth replica, we put it on (masterNode + j) % nodes-th
+      // node
+      for (int j = 1; j <= replicas; j++)
+      {
+        int index = (masterNode + j * partitionList.size()) % instanceNames.size();
+        while(partitionAssignment.keySet().contains(instanceNames.get(index)))
+        {
+          index = (index +1) % instanceNames.size();
+        }
+        partitionAssignment
+            .put(instanceNames.get(index),
+                slaveValue);
+      }
+      String partitionName = dbName + "_" + partitionId;
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+    return result;
+  }
+
+  public static ZNRecord calculateIdealState(List<String> instanceNames,
+      int partitions, int replicas, String dbName)
+  {
+    long randomSeed = 888997632;
+    // seed is a constant, so that the shuffle always give same result
+    return calculateIdealState(instanceNames, partitions, replicas, dbName,
+        randomSeed);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java
new file mode 100644
index 0000000..6e84439
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java
@@ -0,0 +1,101 @@
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+
+public class IdealStateCalculatorForEspressoRelay
+{
+  public static IdealState calculateRelayIdealState(List<String> partitions, List<String> instances, 
+        String resultRecordName, int replica, String firstValue, String restValue, String stateModelName)
+  {
+    Collections.sort(partitions);
+    Collections.sort(instances);
+    if(instances.size() % replica != 0)
+    {
+      throw new HelixException("Instances must be divided by replica");
+    }
+    
+    IdealState result = new IdealState(resultRecordName);
+    result.setNumPartitions(partitions.size());
+    result.setReplicas("" + replica);
+    result.setStateModelDefRef(stateModelName);
+    
+    int groups = instances.size() / replica;
+    int remainder = instances.size() % replica;
+    
+    int remainder2 = partitions.size() % groups;
+    int storageNodeGroupSize = partitions.size() / groups;
+    
+    for(int i = 0; i < groups; i++)
+    {
+      int relayStart = 0, relayEnd = 0, storageNodeStart = 0, storageNodeEnd = 0;
+      if(i < remainder)
+      {
+        relayStart = (replica + 1) * i;
+        relayEnd = (replica + 1) *(i + 1);
+      }
+      else
+      {
+        relayStart = (replica + 1) * remainder + replica * (i - remainder);
+        relayEnd = relayStart + replica;
+      }
+      //System.out.println("relay start :" + relayStart + " relayEnd:" + relayEnd);
+      if(i < remainder2)
+      {
+        storageNodeStart = (storageNodeGroupSize + 1) * i;
+        storageNodeEnd = (storageNodeGroupSize + 1) *(i + 1);
+      }
+      else
+      {
+        storageNodeStart = (storageNodeGroupSize + 1) * remainder2 + storageNodeGroupSize * (i - remainder2);
+        storageNodeEnd = storageNodeStart + storageNodeGroupSize;
+      }
+
+      //System.out.println("storageNodeStart :" + storageNodeStart + " storageNodeEnd:" + storageNodeEnd);
+      List<String> snBatch = partitions.subList(storageNodeStart, storageNodeEnd);
+      List<String> relayBatch = instances.subList(relayStart, relayEnd);
+      
+      Map<String, List<String>> sublistFields = calculateSubIdealState(snBatch, relayBatch, replica);
+      
+      result.getRecord().getListFields().putAll(sublistFields);
+    }
+    
+    for(String snName : result.getRecord().getListFields().keySet())
+    {
+      Map<String, String> mapField = new TreeMap<String, String>();
+      List<String> relayCandidates = result.getRecord().getListField(snName);
+      mapField.put(relayCandidates.get(0), firstValue);
+      for(int i = 1; i < relayCandidates.size(); i++)
+      {
+        mapField.put(relayCandidates.get(i), restValue);
+      }
+      result.getRecord().getMapFields().put(snName, mapField);
+    }
+    System.out.println();
+    return result;
+  }
+
+  private static Map<String, List<String>> calculateSubIdealState(
+      List<String> snBatch, List<String> relayBatch, int replica)
+  {
+    Map<String, List<String>> result = new HashMap<String, List<String>>();
+    for(int i = 0; i < snBatch.size(); i++)
+    {
+      String snName = snBatch.get(i);
+      result.put(snName, new ArrayList<String>());
+      for(int j = 0; j < replica; j++)
+      {
+        result.get(snName).add(relayBatch.get((j + i) % (relayBatch.size())));
+      }
+    }
+    return result;
+  }
+}