You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[27/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
new file mode 100644
index 0000000..0f53724
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -0,0 +1,1089 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.file.FileDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+
+
+public class TestHelper
+{
+  private static final Logger LOG = Logger.getLogger(TestHelper.class);
+
+  static public ZkServer startZkSever(final String zkAddress) throws Exception
+  {
+    List<String> empty = Collections.emptyList();
+    return TestHelper.startZkSever(zkAddress, empty);
+  }
+
+  static public ZkServer startZkSever(final String zkAddress, final String rootNamespace) throws Exception
+  {
+    List<String> rootNamespaces = new ArrayList<String>();
+    rootNamespaces.add(rootNamespace);
+    return TestHelper.startZkSever(zkAddress, rootNamespaces);
+  }
+
+  static public ZkServer startZkSever(final String zkAddress,
+                                      final List<String> rootNamespaces) throws Exception
+  {
+    System.out.println("Start zookeeper at " + zkAddress + " in thread "
+        + Thread.currentThread().getName());
+
+    String zkDir = zkAddress.replace(':', '_');
+    final String logDir = "/tmp/" + zkDir + "/logs";
+    final String dataDir = "/tmp/" + zkDir + "/dataDir";
+    FileUtils.deleteDirectory(new File(dataDir));
+    FileUtils.deleteDirectory(new File(logDir));
+    ZKClientPool.reset();
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+    {
+      @Override
+      public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
+      {
+        for (String rootNamespace : rootNamespaces)
+        {
+          try
+          {
+            zkClient.deleteRecursive(rootNamespace);
+          }
+          catch (Exception e)
+          {
+            LOG.error("fail to deleteRecursive path:" + rootNamespace + "\nexception:"
+                + e);
+          }
+        }
+      }
+    };
+
+    int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
+    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    zkServer.start();
+
+    return zkServer;
+  }
+
+  static public void stopZkServer(ZkServer zkServer)
+  {
+    if (zkServer != null)
+    {
+      zkServer.shutdown();
+      System.out.println("Shut down zookeeper at port " + zkServer.getPort()
+          + " in thread " + Thread.currentThread().getName());
+    }
+  }
+
+  public static StartCMResult startDummyProcess(final String zkAddr,
+                                                final String clusterName,
+                                                final String instanceName) throws Exception
+  {
+    StartCMResult result = new StartCMResult();
+    HelixManager manager = null;
+    manager =
+        HelixManagerFactory.getZKHelixManager(clusterName,
+                                              instanceName,
+                                              InstanceType.PARTICIPANT,
+                                              zkAddr);
+    result._manager = manager;
+    Thread thread = new Thread(new DummyProcessThread(manager, instanceName));
+    result._thread = thread;
+    thread.start();
+
+    return result;
+  }
+
+  // TODO refactor this
+  public static StartCMResult startController(final String clusterName,
+                                              final String controllerName,
+                                              final String zkConnectString,
+                                              final String controllerMode) throws Exception
+  {
+    final StartCMResult result = new StartCMResult();
+    final HelixManager manager =
+        HelixControllerMain.startHelixController(zkConnectString,
+                                                 clusterName,
+                                                 controllerName,
+                                                 controllerMode);
+    result._manager = manager;
+
+    Thread thread = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        // ClusterManager manager = null;
+
+        try
+        {
+
+          Thread.currentThread().join();
+        }
+        catch (InterruptedException e)
+        {
+          String msg =
+              "controller:" + controllerName + ", " + Thread.currentThread().getName()
+                  + " interrupted";
+          LOG.info(msg);
+          // System.err.println(msg);
+        }
+        catch (Exception e)
+        {
+          e.printStackTrace();
+        }
+      }
+    });
+
+    thread.start();
+    result._thread = thread;
+    return result;
+  }
+
+  public static class StartCMResult
+  {
+    public Thread       _thread;
+    public HelixManager _manager;
+
+  }
+
+  public static void setupEmptyCluster(ZkClient zkClient, String clusterName)
+  {
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+    admin.addCluster(clusterName, true);
+  }
+
+  /**
+   * convert T[] to set<T>
+   * 
+   * @param s
+   * @return
+   */
+  public static <T> Set<T> setOf(T... s)
+  {
+    Set<T> set = new HashSet<T>(Arrays.asList(s));
+    return set;
+  }
+
+  public static void verifyWithTimeout(String verifierName, Object... args)
+  {
+    verifyWithTimeout(verifierName, 30 * 1000, args);
+  }
+
+  /**
+   * generic method for verification with a timeout
+   * 
+   * @param verifierName
+   * @param args
+   */
+  public static void verifyWithTimeout(String verifierName, long timeout, Object... args)
+  {
+    final long sleepInterval = 1000; // in ms
+    final int loop = (int) (timeout / sleepInterval) + 1;
+    try
+    {
+      boolean result = false;
+      int i = 0;
+      for (; i < loop; i++)
+      {
+        Thread.sleep(sleepInterval);
+        // verifier should be static method
+        result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, args);
+
+        if (result == true)
+        {
+          break;
+        }
+      }
+
+      // debug
+      // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify ("
+      // + result + ")");
+      System.err.println(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify "
+          + " (" + result + ")");
+      LOG.debug("args:" + Arrays.toString(args));
+      // System.err.println("args:" + Arrays.toString(args));
+
+      if (result == false)
+      {
+        LOG.error(verifierName + " fails");
+        LOG.error("args:" + Arrays.toString(args));
+      }
+
+      Assert.assertTrue(result);
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  private static Method getMethod(String name)
+  {
+    Method[] methods = TestHelper.class.getMethods();
+    for (Method method : methods)
+    {
+      if (name.equals(method.getName()))
+      {
+        return method;
+      }
+    }
+    return null;
+  }
+
+  // for file-based cluster manager
+  public static boolean verifyEmptyCurStateFile(String clusterName,
+                                                String resourceName,
+                                                Set<String> instanceNames,
+                                                FilePropertyStore<ZNRecord> filePropertyStore)
+  {
+    DataAccessor accessor = new FileDataAccessor(filePropertyStore, clusterName);
+
+    for (String instanceName : instanceNames)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+                                     clusterName,
+                                     instanceName);
+      List<String> subPaths =
+          accessor.getChildNames(PropertyType.CURRENTSTATES, instanceName);
+
+      for (String previousSessionId : subPaths)
+      {
+        if (filePropertyStore.exists(path + "/" + previousSessionId + "/" + resourceName))
+        {
+          CurrentState previousCurrentState =
+              accessor.getProperty(CurrentState.class,
+                                   PropertyType.CURRENTSTATES,
+                                   instanceName,
+                                   previousSessionId,
+                                   resourceName);
+
+          if (previousCurrentState.getRecord().getMapFields().size() != 0)
+          {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  public static boolean verifyEmptyCurStateAndExtView(String clusterName,
+                                                      String resourceName,
+                                                      Set<String> instanceNames,
+                                                      String zkAddr)
+  {
+    ZkClient zkClient = new ZkClient(zkAddr);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    try
+    {
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      for (String instanceName : instanceNames)
+      {
+        List<String> sessionIds =
+            accessor.getChildNames(keyBuilder.sessions(instanceName));
+
+        for (String sessionId : sessionIds)
+        {
+          CurrentState curState =
+              accessor.getProperty(keyBuilder.currentState(instanceName,
+                                                           sessionId,
+                                                           resourceName));
+
+          if (curState != null && curState.getRecord().getMapFields().size() != 0)
+          {
+            return false;
+          }
+        }
+
+        ExternalView extView =
+            accessor.getProperty(keyBuilder.externalView(resourceName));
+
+        if (extView != null && extView.getRecord().getMapFields().size() != 0)
+        {
+          return false;
+        }
+
+      }
+
+      return true;
+    }
+    finally
+    {
+      zkClient.close();
+    }
+  }
+
+  public static boolean verifyNotConnected(HelixManager manager)
+  {
+    return !manager.isConnected();
+  }
+
+  public static void setupCluster(String clusterName,
+                                  String zkAddr,
+                                  int startPort,
+                                  String participantNamePrefix,
+                                  String resourceNamePrefix,
+                                  int resourceNb,
+                                  int partitionNb,
+                                  int nodesNb,
+                                  int replica,
+                                  String stateModelDef,
+                                  boolean doRebalance) throws Exception
+  {
+    TestHelper.setupCluster(clusterName,
+                            zkAddr,
+                            startPort,
+                            participantNamePrefix,
+                            resourceNamePrefix,
+                            resourceNb,
+                            partitionNb,
+                            nodesNb,
+                            replica,
+                            stateModelDef,
+                            IdealStateModeProperty.AUTO,
+                            doRebalance);
+  }
+
+  public static void setupCluster(String clusterName,
+                                  String ZkAddr,
+                                  int startPort,
+                                  String participantNamePrefix,
+                                  String resourceNamePrefix,
+                                  int resourceNb,
+                                  int partitionNb,
+                                  int nodesNb,
+                                  int replica,
+                                  String stateModelDef,
+                                  IdealStateModeProperty mode,
+                                  boolean doRebalance) throws Exception
+  {
+    ZkClient zkClient = new ZkClient(ZkAddr);
+    if (zkClient.exists("/" + clusterName))
+    {
+      LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+      zkClient.deleteRecursive("/" + clusterName);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+    setupTool.addCluster(clusterName, true);
+
+    for (int i = 0; i < nodesNb; i++)
+    {
+      int port = startPort + i;
+      setupTool.addInstanceToCluster(clusterName, participantNamePrefix + ":" + port);
+    }
+
+    for (int i = 0; i < resourceNb; i++)
+    {
+      String dbName = resourceNamePrefix + i;
+      setupTool.addResourceToCluster(clusterName, dbName, partitionNb, stateModelDef, mode.toString());
+      if (doRebalance)
+      {
+        setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+      }
+    }
+    zkClient.close();
+  }
+
+  /**
+   * 
+   * @param stateMap
+   *          : "ResourceName/partitionKey" -> setOf(instances)
+   * @param state
+   *          : MASTER|SLAVE|ERROR...
+   */
+  public static void verifyState(String clusterName,
+                                 String zkAddr,
+                                 Map<String, Set<String>> stateMap,
+                                 String state)
+  {
+    ZkClient zkClient = new ZkClient(zkAddr);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    try
+    {
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      for (String resGroupPartitionKey : stateMap.keySet())
+      {
+        Map<String, String> retMap = getResourceAndPartitionKey(resGroupPartitionKey);
+        String resGroup = retMap.get("RESOURCE");
+        String partitionKey = retMap.get("PARTITION");
+
+        ExternalView extView = accessor.getProperty(keyBuilder.externalView(resGroup));
+        for (String instance : stateMap.get(resGroupPartitionKey))
+        {
+          String actualState = extView.getStateMap(partitionKey).get(instance);
+          Assert.assertNotNull(actualState, "externalView doesn't contain state for "
+              + resGroup + "/" + partitionKey + " on " + instance + " (expect " + state
+              + ")");
+
+          Assert.assertEquals(actualState, state, "externalView for " + resGroup + "/"
+              + partitionKey + " on " + instance + " is " + actualState + " (expect "
+              + state + ")");
+        }
+      }
+    }
+    finally
+    {
+      zkClient.close();
+    }
+  }
+
+  /**
+   * 
+   * @param resourcePartition
+   *          : key is in form of "resource/partitionKey" or "resource_x"
+   * 
+   * @return
+   */
+  private static Map<String, String> getResourceAndPartitionKey(String resourcePartition)
+  {
+    String resourceName;
+    String partitionName;
+    int idx = resourcePartition.indexOf('/');
+    if (idx > -1)
+    {
+      resourceName = resourcePartition.substring(0, idx);
+      partitionName = resourcePartition.substring(idx + 1);
+    }
+    else
+    {
+      idx = resourcePartition.lastIndexOf('_');
+      resourceName = resourcePartition.substring(0, idx);
+      partitionName = resourcePartition;
+    }
+
+    Map<String, String> retMap = new HashMap<String, String>();
+    retMap.put("RESOURCE", resourceName);
+    retMap.put("PARTITION", partitionName);
+    return retMap;
+  }
+
+  public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads,
+                                                            final Callable<T> method,
+                                                            final long timeout)
+  {
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+    final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+    final List<Thread> threadList = new ArrayList<Thread>();
+
+    for (int i = 0; i < nrThreads; i++)
+    {
+      Thread thread = new Thread()
+      {
+        @Override
+        public void run()
+        {
+          try
+          {
+            boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+            if (isTimeout)
+            {
+              LOG.error("Timeout while waiting for start latch");
+            }
+          }
+          catch (InterruptedException ex)
+          {
+            LOG.error("Interrupted while waiting for start latch");
+          }
+
+          try
+          {
+            T result = method.call();
+            if (result != null)
+            {
+              resultsMap.put("thread_" + this.getId(), result);
+            }
+            LOG.debug("result=" + result);
+          }
+          catch (Exception e)
+          {
+            LOG.error("Exeption in executing " + method.getClass().getName(), e);
+          }
+
+          finishCounter.countDown();
+        }
+      };
+      threadList.add(thread);
+      thread.start();
+    }
+    startLatch.countDown();
+
+    // wait for all thread to complete
+    try
+    {
+      boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+      if (isTimeout)
+      {
+        LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
+        for (Thread thread : threadList)
+        {
+          thread.interrupt();
+        }
+      }
+    }
+    catch (InterruptedException e)
+    {
+      LOG.error("Interrupted while waiting for finish latch", e);
+    }
+
+    return resultsMap;
+  }
+
+  public static Message createMessage(String msgId,
+                                      String fromState,
+                                      String toState,
+                                      String tgtName,
+                                      String resourceName,
+                                      String partitionName)
+  {
+    Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    msg.setTgtName(tgtName);
+    msg.setResourceName(resourceName);
+    msg.setPartitionName(partitionName);
+    msg.setStateModelDef("MasterSlave");
+
+    return msg;
+  }
+
+  public static int numberOfListeners(String zkAddr, String path) throws Exception
+  {
+    int count = 0;
+    String splits[] = zkAddr.split(":");
+    Socket sock = new Socket(splits[0], Integer.parseInt(splits[1]));
+    PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
+    BufferedReader in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+
+    out.println("wchp");
+
+    String line = in.readLine();
+    while (line != null)
+    {
+      // System.out.println(line);
+      if (line.equals(path))
+      {
+        // System.out.println("match: " + line);
+
+        String nextLine = in.readLine();
+        if (nextLine == null)
+        {
+          break;
+        }
+        // System.out.println(nextLine);
+        while (nextLine.startsWith("\t0x"))
+        {
+          count++;
+          nextLine = in.readLine();
+          if (nextLine == null)
+          {
+            break;
+          }
+        }
+      }
+      line = in.readLine();
+    }
+    sock.close();
+    return count;
+  }
+
+  public static String getTestMethodName()
+  {
+    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+    return calls[2].getMethodName();
+  }
+
+  public static String getTestClassName()
+  {
+    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+    String fullClassName = calls[2].getClassName();
+    return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+  }
+
+  public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods,
+                                                            final long timeout)
+  {
+    final int nrThreads = methods.size();
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+    final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+    final List<Thread> threadList = new ArrayList<Thread>();
+
+    for (int i = 0; i < nrThreads; i++)
+    {
+      final Callable<T> method = methods.get(i);
+
+      Thread thread = new Thread()
+      {
+        @Override
+        public void run()
+        {
+          try
+          {
+            boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+            if (isTimeout)
+            {
+              LOG.error("Timeout while waiting for start latch");
+            }
+          }
+          catch (InterruptedException ex)
+          {
+            LOG.error("Interrupted while waiting for start latch");
+          }
+
+          try
+          {
+            T result = method.call();
+            if (result != null)
+            {
+              resultsMap.put("thread_" + this.getId(), result);
+            }
+            LOG.debug("result=" + result);
+          }
+          catch (Exception e)
+          {
+            LOG.error("Exeption in executing " + method.getClass().getName(), e);
+          }
+
+          finishCounter.countDown();
+        }
+      };
+      threadList.add(thread);
+      thread.start();
+    }
+    startLatch.countDown();
+
+    // wait for all thread to complete
+    try
+    {
+      boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+      if (isTimeout)
+      {
+        LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
+        for (Thread thread : threadList)
+        {
+          thread.interrupt();
+        }
+      }
+    }
+    catch (InterruptedException e)
+    {
+      LOG.error("Interrupted while waiting for finish latch", e);
+    }
+
+    return resultsMap;
+  }
+
+  public static void printCache(Map<String, ZNode> cache)
+  {
+    System.out.println("START:Print cache");
+    TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
+    map.putAll(cache);
+
+    for (String key : map.keySet())
+    {
+      ZNode node = map.get(key);
+      TreeSet<String> childSet = new TreeSet<String>();
+      childSet.addAll(node.getChildSet());
+      System.out.print(key + "=" + node.getData() + ", " + childSet + ", "
+          + (node.getStat() == null ? "null\n" : node.getStat()));
+    }
+    System.out.println("END:Print cache");
+  }
+
+  public static void readZkRecursive(String path,
+                                     Map<String, ZNode> map,
+                                     ZkClient zkclient)
+  {
+    try
+    {
+      Stat stat = new Stat();
+      ZNRecord record = zkclient.readData(path, stat);
+      List<String> childNames = zkclient.getChildren(path);
+      ZNode node = new ZNode(path, record, stat);
+      node.addChildren(childNames);
+      map.put(path, node);
+
+      for (String childName : childNames)
+      {
+        String childPath = path + "/" + childName;
+        readZkRecursive(childPath, map, zkclient);
+      }
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK
+    }
+  }
+
+  public static void readZkRecursive(String path,
+                                     Map<String, ZNode> map,
+                                     BaseDataAccessor<ZNRecord> zkAccessor)
+  {
+    try
+    {
+      Stat stat = new Stat();
+      ZNRecord record = zkAccessor.get(path, stat, 0);
+      List<String> childNames = zkAccessor.getChildNames(path, 0);
+      // System.out.println("childNames: " + childNames);
+      ZNode node = new ZNode(path, record, stat);
+      node.addChildren(childNames);
+      map.put(path, node);
+
+      if (childNames != null && !childNames.isEmpty())
+      {
+        for (String childName : childNames)
+        {
+          String childPath = path + "/" + childName;
+          readZkRecursive(childPath, map, zkAccessor);
+        }
+      }
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK
+    }
+  }
+
+  public static boolean verifyZkCache(List<String> paths,
+                                      BaseDataAccessor<ZNRecord> zkAccessor,
+                                      ZkClient zkclient,
+                                      boolean needVerifyStat)
+  {
+    // read everything
+    Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+    Map<String, ZNode> cache = new HashMap<String, ZNode>();
+    for (String path : paths)
+    {
+      readZkRecursive(path, zkMap, zkclient);
+      readZkRecursive(path, cache, zkAccessor);
+    }
+    // printCache(map);
+
+    return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
+  }
+
+  public static boolean verifyZkCache(List<String> paths,
+                                      Map<String, ZNode> cache,
+                                      ZkClient zkclient,
+                                      boolean needVerifyStat)
+  {
+    return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
+  }
+
+  public static boolean verifyZkCache(List<String> paths,
+                                      List<String> pathsExcludeForStat,
+                                      Map<String, ZNode> cache,
+                                      ZkClient zkclient,
+                                      boolean needVerifyStat)
+  {
+    // read everything on zk under paths
+    Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+    for (String path : paths)
+    {
+      readZkRecursive(path, zkMap, zkclient);
+    }
+    // printCache(map);
+
+    return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat);
+  }
+
+  public static boolean verifyZkCache(List<String> paths,
+                                      List<String> pathsExcludeForStat,
+                                      Map<String, ZNode> cache,
+                                      Map<String, ZNode> zkMap,
+                                      boolean needVerifyStat)
+  {
+    // equal size
+    if (zkMap.size() != cache.size())
+    {
+      System.err.println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: "
+          + zkMap.size());
+      System.out.println("cache: (" + cache.size() + ")");
+      TestHelper.printCache(cache);
+
+      System.out.println("zkMap: (" + zkMap.size() + ")");
+      TestHelper.printCache(zkMap);
+
+      return false;
+    }
+
+    // everything in cache is also in map
+    for (String path : cache.keySet())
+    {
+      ZNode cacheNode = cache.get(path);
+      ZNode zkNode = zkMap.get(path);
+
+      if (zkNode == null)
+      {
+        // in cache but not on zk
+        System.err.println("path: " + path + " in cache but not on zk: inCacheNode: "
+            + cacheNode);
+        return false;
+      }
+
+      if ((zkNode.getData() == null && cacheNode.getData() != null)
+          || (zkNode.getData() != null && cacheNode.getData() == null)
+          || (zkNode.getData() != null && cacheNode.getData() != null && !zkNode.getData()
+                                                                                .equals(cacheNode.getData())))
+      {
+        // data not equal
+        System.err.println("data mismatch on path: " + path + ", inCache: "
+            + cacheNode.getData() + ", onZk: " + zkNode.getData());
+        return false;
+      }
+
+      if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null)
+          || (zkNode.getChildSet() != null && cacheNode.getChildSet() == null)
+          || (zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet()
+                                                                                        .equals(cacheNode.getChildSet())))
+      {
+        // childSet not equal
+        System.err.println("childSet mismatch on path: " + path + ", inCache: "
+            + cacheNode.getChildSet() + ", onZk: " + zkNode.getChildSet());
+        return false;
+      }
+
+      if (needVerifyStat && pathsExcludeForStat != null
+          && !pathsExcludeForStat.contains(path))
+      {
+        if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat()))
+        {
+          // stat not equal
+          System.err.println("Stat mismatch on path: " + path + ", inCache: "
+              + cacheNode.getStat() + ", onZk: " + zkNode.getStat());
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  public static StateModelDefinition generateStateModelDefForBootstrap()
+  {
+    ZNRecord record = new ZNRecord("Bootstrap");
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "IDLE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("ONLINE");
+    statePriorityList.add("BOOTSTRAP");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("IDLE");
+    statePriorityList.add("DROPPED");
+    statePriorityList.add("ERROR");
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+                        statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("ONLINE"))
+      {
+        metadata.put("count", "R");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("BOOTSTRAP"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("OFFLINE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("IDLE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("DROPPED"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("ERROR"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+    }
+
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      if (state.equals("ONLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("BOOTSTRAP", "OFFLINE");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        metadata.put("IDLE", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("BOOTSTRAP"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "ONLINE");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        metadata.put("IDLE", "OFFLINE");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("OFFLINE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "BOOTSTRAP");
+        metadata.put("BOOTSTRAP", "BOOTSTRAP");
+        metadata.put("DROPPED", "IDLE");
+        metadata.put("IDLE", "IDLE");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("IDLE"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "OFFLINE");
+        metadata.put("BOOTSTRAP", "OFFLINE");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "DROPPED");
+        record.setMapField(key, metadata);
+      }
+      else if (state.equals("ERROR"))
+      {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("IDLE", "IDLE");
+        record.setMapField(key, metadata);
+      }
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("ONLINE-OFFLINE");
+    stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
+    stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
+    stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
+    stateTransitionPriorityList.add("OFFLINE-IDLE");
+    stateTransitionPriorityList.add("IDLE-OFFLINE");
+    stateTransitionPriorityList.add("IDLE-DROPPED");
+    stateTransitionPriorityList.add("ERROR-IDLED");
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+    return new StateModelDefinition(record);
+  }
+
+  public static String znrecordToString(ZNRecord record)
+  {
+    StringBuffer sb = new StringBuffer();
+    sb.append(record.getId() + "\n");
+    Map<String, String> simpleFields = record.getSimpleFields();
+    if (simpleFields != null)
+    {
+      sb.append("simpleFields\n");
+      for (String key : simpleFields.keySet())
+      {
+        sb.append("  " + key + "\t: " + simpleFields.get(key) + "\n");
+      }
+    }
+
+    Map<String, List<String>> listFields = record.getListFields();
+    sb.append("listFields\n");
+    for (String key : listFields.keySet())
+    {
+      List<String> list = listFields.get(key);
+      sb.append("  " + key + "\t: ");
+      for (String listValue : list)
+      {
+        sb.append(listValue + ", ");
+      }
+      sb.append("\n");
+    }
+
+    Map<String, Map<String, String>> mapFields = record.getMapFields();
+    sb.append("mapFields\n");
+    for (String key : mapFields.keySet())
+    {
+      Map<String, String> map = mapFields.get(key);
+      sb.append("  " + key + "\t: \n");
+      for (String mapKey : map.keySet())
+      {
+        sb.append("    " + mapKey + "\t: " + map.get(mapKey) + "\n");
+      }
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
new file mode 100644
index 0000000..81ea6bc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.HierarchicalDataHolder;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import java.io.FileFilter;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHierarchicalDataStore extends ZkUnitTestBase
+{
+  protected static ZkClient _zkClientString = null;
+
+  @Test (groups = {"unitTest"})
+  public void testHierarchicalDataStore()
+  {
+    _zkClientString = new ZkClient(ZK_ADDR, 1000, 3000);
+
+    String path = "/tmp/testHierarchicalDataStore";
+    FileFilter filter = null;
+    // _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    _zkClientString.deleteRecursive(path);
+    HierarchicalDataHolder<ZNRecord> dataHolder = new HierarchicalDataHolder<ZNRecord>(
+        _zkClientString, path, filter);
+    dataHolder.print();
+    AssertJUnit.assertFalse(dataHolder.refreshData());
+
+    // write data
+    add(path, "root data");
+    AssertJUnit.assertTrue(dataHolder.refreshData());
+    dataHolder.print();
+
+    // add some children
+    add(path + "/child1", "child 1 data");
+    add(path + "/child2", "child 2 data");
+    AssertJUnit.assertTrue(dataHolder.refreshData());
+    dataHolder.print();
+
+    // add some grandchildren
+    add(path + "/child1" + "/grandchild1", "grand child 1 data");
+    add(path + "/child1" + "/grandchild2", "grand child 2 data");
+    AssertJUnit.assertTrue(dataHolder.refreshData());
+    dataHolder.print();
+    
+    AssertJUnit.assertFalse(dataHolder.refreshData());
+    
+    set(path + "/child1", "new child 1 data");
+    AssertJUnit.assertTrue(dataHolder.refreshData());
+    dataHolder.print();
+  }
+
+  private void set(String path, String data)
+  {
+    _zkClientString.writeData(path, data);
+  }
+
+  private void add(String path, String data)
+  {
+    _zkClientString.createPersistent(path, true);
+    _zkClientString.writeData(path, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
new file mode 100644
index 0000000..406f92f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks.MockHealthReportProvider;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.healthcheck.*;
+import org.testng.annotations.Test;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+
+public class TestParticipantHealthReportCollectorImpl {
+
+	protected ParticipantHealthReportCollectorImpl _providerImpl;
+	protected HelixManager _manager;
+	protected MockHealthReportProvider _mockProvider;
+	
+	 @BeforeMethod (groups = {"unitTest"})
+	public void setup()
+	{
+		 _providerImpl = new ParticipantHealthReportCollectorImpl(new MockManager(), "instance_123");
+		 _mockProvider = new MockHealthReportProvider();
+	}
+	
+	 @Test (groups = {"unitTest"})
+	  public void testStart() throws Exception
+	  {
+		 _providerImpl.start();
+		 _providerImpl.start();
+	  }
+	 
+	 @Test (groups = {"unitTest"})
+	  public void testStop() throws Exception
+	  {
+		 _providerImpl.stop();
+		 _providerImpl.stop();
+	  }
+	 
+	 @Test (groups = {"unitTest"})
+	 public void testAddProvider() throws Exception 
+	 {
+		 _providerImpl.removeHealthReportProvider(_mockProvider);
+		 _providerImpl.addHealthReportProvider(_mockProvider);
+		 _providerImpl.addHealthReportProvider(_mockProvider);
+	 }
+	 
+	 @Test (groups = {"unitTest"})
+	 public void testRemoveProvider() throws Exception
+	 {
+		 _providerImpl.addHealthReportProvider(_mockProvider);
+		 _providerImpl.removeHealthReportProvider(_mockProvider);
+		 _providerImpl.removeHealthReportProvider(_mockProvider);
+	 }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
new file mode 100644
index 0000000..54e96cf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+
+import org.apache.helix.healthcheck.DefaultPerfCounters;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+
+public class TestPerfCounters {
+
+	final String INSTANCE_NAME = "instance_123";
+    final long AVAILABLE_CPUS = 1;
+    final long FREE_PHYSICAL_MEMORY = 2;
+    final long FREE_JVM_MEMORY = 3;
+    final long TOTAL_JVM_MEMORY = 4;
+    final double AVERAGE_SYSTEM_LOAD = 5;
+
+	DefaultPerfCounters _perfCounters;
+
+	@BeforeTest ()
+	public void setup()
+	{
+		_perfCounters = new DefaultPerfCounters(INSTANCE_NAME, AVAILABLE_CPUS,
+				FREE_PHYSICAL_MEMORY, FREE_JVM_MEMORY, TOTAL_JVM_MEMORY,
+				AVERAGE_SYSTEM_LOAD);
+	}
+
+	 @Test ()
+	 public void testGetAvailableCpus()
+	 {
+		 AssertJUnit.assertEquals(AVAILABLE_CPUS,_perfCounters.getAvailableCpus());
+	 }
+
+	 @Test ()
+	 public void testGetAverageSystemLoad()
+	 {
+		 AssertJUnit.assertEquals(AVERAGE_SYSTEM_LOAD,_perfCounters.getAverageSystemLoad());
+	 }
+
+	 @Test ()
+	 public void testGetTotalJvmMemory()
+	 {
+		 AssertJUnit.assertEquals(TOTAL_JVM_MEMORY,_perfCounters.getTotalJvmMemory());
+	 }
+
+	 @Test ()
+	 public void testGetFreeJvmMemory()
+	 {
+		 AssertJUnit.assertEquals(FREE_JVM_MEMORY,_perfCounters.getFreeJvmMemory());
+	 }
+
+	 @Test ()
+	 public void testGetFreePhysicalMemory()
+	 {
+		 AssertJUnit.assertEquals(FREE_PHYSICAL_MEMORY,_perfCounters.getFreePhysicalMemory());
+	 }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java b/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java
new file mode 100644
index 0000000..f0ea906
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.PerformanceHealthReportProvider;
+import org.apache.helix.model.HealthStat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestPerformanceHealthReportProvider {
+
+	 protected static final String CLUSTER_NAME = "TestCluster";
+	 protected final String STAT_NAME = "Stat_123";
+	 protected final String PARTITION_NAME = "Partition_456";
+	 protected final String FAKE_STAT_NAME = "Stat_ABC";
+	 protected final String FAKE_PARTITION_NAME = "Partition_DEF";
+	 protected final String STORED_STAT = "789";
+	 protected final String INSTANCE_NAME = "instance:1";
+
+	PerformanceHealthReportProvider _healthProvider;
+	MockManager _helixManager;
+
+	public void incrementPartitionStat() throws Exception
+	{
+		_helixManager = new MockManager(CLUSTER_NAME);
+		_healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+	}
+
+	public void transmitReport() throws Exception
+	{
+		_helixManager = new MockManager(CLUSTER_NAME);
+		 Map<String, Map<String, String>> partitionReport = _healthProvider
+	                .getRecentPartitionHealthReport();
+		 ZNRecord record = new ZNRecord(_healthProvider.getReportName());
+		 if (partitionReport != null) {
+         	record.setMapFields(partitionReport);
+         }
+		 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+
+		 Builder keyBuilder = accessor.keyBuilder();
+		 accessor.setProperty(keyBuilder.healthReport(INSTANCE_NAME, record.getId()), new HealthStat(record));
+	}
+
+	@BeforeMethod ()
+	public void setup()
+	{
+		_healthProvider = new PerformanceHealthReportProvider();
+	}
+
+	 @Test ()
+	  public void testGetRecentHealthReports() throws Exception
+	  {
+		 _healthProvider.getRecentHealthReport();
+		 _healthProvider.getRecentPartitionHealthReport();
+	  }
+
+	 @Test ()
+	 public void testIncrementPartitionStat() throws Exception
+	 {
+		 //stat does not exist yet
+		 _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+		 transmitReport();
+		 //stat does exist
+		 _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+		 transmitReport();
+		 String retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+		 AssertJUnit.assertEquals(2.0, Double.parseDouble(retrievedStat));
+
+		 //set to some other value
+		 _healthProvider.submitPartitionStat(STAT_NAME, PARTITION_NAME, STORED_STAT);
+		 transmitReport();
+		 _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+		 transmitReport();
+		 retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+		 AssertJUnit.assertEquals(Double.parseDouble(retrievedStat), Double.parseDouble(STORED_STAT)+1);
+	 }
+
+	 @Test ()
+	 public void testSetGetPartitionStat() throws Exception
+	 {
+		 _healthProvider.submitPartitionStat(STAT_NAME, PARTITION_NAME, STORED_STAT);
+		 transmitReport();
+		 String retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+		 //check on correct retrieval for real stat, real partition
+		 AssertJUnit.assertEquals(STORED_STAT, retrievedStat);
+
+		 //real stat, fake partition
+		 retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, FAKE_PARTITION_NAME);
+		 AssertJUnit.assertNull(retrievedStat);
+
+		 //fake stat, real partition
+		 retrievedStat = _healthProvider.getPartitionStat(FAKE_STAT_NAME, PARTITION_NAME);
+		 AssertJUnit.assertNull(retrievedStat);
+
+		 //fake stat, fake partition
+		 retrievedStat = _healthProvider.getPartitionStat(FAKE_STAT_NAME, FAKE_PARTITION_NAME);
+		 AssertJUnit.assertNull(retrievedStat);
+	 }
+
+	 @Test ()
+	 public void testGetPartitionHealthReport() throws Exception
+	 {
+		 //test empty map case
+		 Map<String, Map<String, String>> resultMap = _healthProvider.getRecentPartitionHealthReport();
+		 AssertJUnit.assertEquals(resultMap.size(), 0);
+
+		 //test non-empty case
+		 testSetGetPartitionStat();
+		 resultMap = _healthProvider.getRecentPartitionHealthReport();
+		 //check contains 1 stat
+		 AssertJUnit.assertEquals(1, resultMap.size());
+		 //check contains STAT_NAME STAT
+		 AssertJUnit.assertTrue(resultMap.keySet().contains(STAT_NAME));
+		 Map<String, String> statMap = resultMap.get(STAT_NAME);
+		 //check statMap has size 1
+		 AssertJUnit.assertEquals(1, statMap.size());
+		 //check contains PARTITION_NAME
+		 AssertJUnit.assertTrue(statMap.keySet().contains(PARTITION_NAME));
+		 //check stored val
+		 String statVal = statMap.get(PARTITION_NAME);
+		 AssertJUnit.assertEquals(statVal, STORED_STAT);
+	 }
+
+	 @Test ()
+	 public void testPartitionStatReset() throws Exception
+	 {
+		 incrementPartitionStat();
+		 //ensure stat appears
+		 String retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+		 AssertJUnit.assertEquals(1.0, Double.parseDouble(retrievedStat));
+		 //reset partition stats
+		 _healthProvider.resetStats();
+		 transmitReport();
+		 retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+		 AssertJUnit.assertEquals(null, retrievedStat);
+	 }
+
+	 @Test ()
+	  public void testGetReportName() throws Exception
+	  {
+		 _healthProvider.getReportName();
+	  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java
new file mode 100644
index 0000000..b585b34
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+
+@Test
+public class TestPropertyPathConfig
+{
+  @Test
+  public void testGetPath()
+  {
+    String actual;
+    actual = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, "test_cluster");
+    AssertJUnit.assertEquals(actual, "/test_cluster/IDEALSTATES");
+    actual = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, "test_cluster","resource");
+    AssertJUnit.assertEquals(actual, "/test_cluster/IDEALSTATES/resource");
+
+    
+    actual = PropertyPathConfig.getPath(PropertyType.INSTANCES, "test_cluster","instanceName1");
+    AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1");
+
+    actual = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, "test_cluster","instanceName1");
+    AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES");
+    actual = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, "test_cluster","instanceName1","sessionId");
+    AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES/sessionId");
+    
+    actual = PropertyPathConfig.getPath(PropertyType.CONTROLLER, "test_cluster");
+    AssertJUnit.assertEquals(actual, "/test_cluster/CONTROLLER");
+    actual = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, "test_cluster");
+    AssertJUnit.assertEquals(actual, "/test_cluster/CONTROLLER/MESSAGES");
+
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java b/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
new file mode 100644
index 0000000..73850f2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
@@ -0,0 +1,70 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.IdealStateCalculatorForEspressoRelay;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRelayIdealStateCalculator
+{
+  @Test ()
+  public void testEspressoStorageClusterIdealState() throws Exception
+  {
+    testEspressoStorageClusterIdealState(15, 9, 3);
+    testEspressoStorageClusterIdealState(15, 6, 3);
+    testEspressoStorageClusterIdealState(15, 6, 2);
+    testEspressoStorageClusterIdealState(6,  4, 2);
+  }
+  public void testEspressoStorageClusterIdealState(int partitions, int nodes, int replica) throws Exception
+  {
+    List<String> storageNodes = new ArrayList<String>();
+    for(int i = 0;i < partitions; i++)
+    {
+      storageNodes.add("localhost:123" + i);
+    }
+    
+    List<String> relays = new ArrayList<String>();
+    for(int i = 0;i < nodes; i++)
+    {
+      relays.add("relay:123" + i);
+    }
+    
+    IdealState idealstate = IdealStateCalculatorForEspressoRelay.calculateRelayIdealState(storageNodes, relays, "TEST", replica, "Leader", "Standby", "LeaderStandby");
+    
+    Assert.assertEquals(idealstate.getRecord().getListFields().size(), idealstate.getRecord().getMapFields().size());
+    
+    Map<String, Integer> countMap = new TreeMap<String, Integer>();
+    for(String key  : idealstate.getRecord().getListFields().keySet())
+    {
+      Assert.assertEquals(idealstate.getRecord().getListFields().get(key).size(), idealstate.getRecord().getMapFields().get(key).size());
+      List<String> list = idealstate.getRecord().getListFields().get(key);
+      Map<String, String> map = idealstate.getRecord().getMapFields().get(key);
+      Assert.assertEquals(list.size(), replica);
+      for(String val : list)
+      {
+        if(!countMap.containsKey(val))
+        {
+          countMap.put(val, 1);
+        }
+        else
+        {
+          countMap.put(val, countMap.get(val) + 1);
+        }
+        Assert.assertTrue(map.containsKey(val));
+      }
+    }
+    for(String nodeName : countMap.keySet())
+    {
+      Assert.assertTrue(countMap.get(nodeName) <= partitions * replica / nodes + 1);
+      //System.out.println(nodeName + " " + countMap.get(nodeName));
+    }
+    System.out.println();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
new file mode 100644
index 0000000..35eb5cb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
@@ -0,0 +1,289 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.Mocks.MockAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestRoutingTable
+{
+  NotificationContext changeContext = null;
+
+  @BeforeClass()
+  public synchronized void setup()
+  {
+
+    final String[] array = new String[] { "localhost_8900", "localhost_8901" };
+    HelixManager manager = new Mocks.MockManager() {
+      private MockAccessor _mockAccessor;
+
+      @Override
+//      public DataAccessor getDataAccessor()
+      public HelixDataAccessor getHelixDataAccessor()
+      {
+        if (_mockAccessor == null)
+        {
+          _mockAccessor = new Mocks.MockAccessor() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+//            public List<ZNRecord> getChildValues(PropertyType type, String... keys)
+            {
+              PropertyType type = key.getType();
+              String[] keys = key.getParams();
+              if (type == PropertyType.CONFIGS && keys != null && keys.length > 1
+                  && keys[1].equalsIgnoreCase(ConfigScopeProperty.PARTICIPANT.toString()))
+              {
+                List<InstanceConfig> configs = new ArrayList<InstanceConfig>();
+                for (String instanceName : array)
+                {
+                  InstanceConfig config = new InstanceConfig(instanceName);
+                  String[] splits = instanceName.split("_");
+                  config.setHostName(splits[0]);
+                  config.setPort(splits[1]);
+                  configs.add(config);
+                }
+                return (List<T>) configs;
+              }
+              return Collections.emptyList();
+            };
+          };
+        }
+        return _mockAccessor;
+      }
+    };
+    changeContext = new NotificationContext(manager);
+  }
+
+  @Test()
+  public void testNullAndEmpty()
+  {
+
+    RoutingTableProvider routingTable = new RoutingTableProvider();
+    routingTable.onExternalViewChange(null, changeContext);
+    List<ExternalView> list = Collections.emptyList();
+    routingTable.onExternalViewChange(list, changeContext);
+
+  }
+
+  @Test()
+  public void testSimple()
+  {
+    List<InstanceConfig> instances;
+    RoutingTableProvider routingTable = new RoutingTableProvider();
+    ZNRecord record = new ZNRecord("TESTDB");
+
+    // one master
+    add(record, "TESTDB_0", "localhost_8900", "MASTER");
+    List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+    externalViewList.add(new ExternalView(record));
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+
+    instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+    AssertJUnit.assertNotNull(instances);
+    AssertJUnit.assertEquals(instances.size(), 1);
+
+    // additions
+    add(record, "TESTDB_0", "localhost_8901", "MASTER");
+    add(record, "TESTDB_1", "localhost_8900", "SLAVE");
+
+    externalViewList = new ArrayList<ExternalView>();
+    externalViewList.add(new ExternalView(record));
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+    AssertJUnit.assertNotNull(instances);
+    AssertJUnit.assertEquals(instances.size(), 2);
+
+    instances = routingTable.getInstances("TESTDB", "TESTDB_1", "SLAVE");
+    AssertJUnit.assertNotNull(instances);
+    AssertJUnit.assertEquals(instances.size(), 1);
+
+    // updates
+    add(record, "TESTDB_0", "localhost_8901", "SLAVE");
+    externalViewList = new ArrayList<ExternalView>();
+    externalViewList.add(new ExternalView(record));
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    instances = routingTable.getInstances("TESTDB", "TESTDB_0", "SLAVE");
+    AssertJUnit.assertNotNull(instances);
+    AssertJUnit.assertEquals(instances.size(), 1);
+  }
+
+  @Test()
+  public void testStateUnitGroupDeletion()
+  {
+    List<InstanceConfig> instances;
+    RoutingTableProvider routingTable = new RoutingTableProvider();
+
+    List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+    ZNRecord record = new ZNRecord("TESTDB");
+
+    // one master
+    add(record, "TESTDB_0", "localhost_8900", "MASTER");
+    externalViewList.add(new ExternalView(record));
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+    AssertJUnit.assertNotNull(instances);
+    AssertJUnit.assertEquals(instances.size(), 1);
+
+    externalViewList.clear();
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+    AssertJUnit.assertNotNull(instances);
+    AssertJUnit.assertEquals(instances.size(), 0);
+  }
+
+  @Test()
+  public void testGetInstanceForAllStateUnits()
+  {
+    List<InstanceConfig> instancesList;
+    Set<InstanceConfig> instancesSet;
+    InstanceConfig instancesArray[];
+    RoutingTableProvider routingTable = new RoutingTableProvider();
+    List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+    ZNRecord record = new ZNRecord("TESTDB");
+
+    // one master
+    add(record, "TESTDB_0", "localhost_8900", "MASTER");
+    add(record, "TESTDB_1", "localhost_8900", "MASTER");
+    add(record, "TESTDB_2", "localhost_8900", "MASTER");
+    add(record, "TESTDB_3", "localhost_8900", "SLAVE");
+    add(record, "TESTDB_4", "localhost_8900", "SLAVE");
+    add(record, "TESTDB_5", "localhost_8900", "SLAVE");
+
+    add(record, "TESTDB_0", "localhost_8901", "SLAVE");
+    add(record, "TESTDB_1", "localhost_8901", "SLAVE");
+    add(record, "TESTDB_2", "localhost_8901", "SLAVE");
+    add(record, "TESTDB_3", "localhost_8901", "MASTER");
+    add(record, "TESTDB_4", "localhost_8901", "MASTER");
+    add(record, "TESTDB_5", "localhost_8901", "MASTER");
+
+    externalViewList.add(new ExternalView(record));
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    instancesList = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+    AssertJUnit.assertNotNull(instancesList);
+    AssertJUnit.assertEquals(instancesList.size(), 1);
+    instancesSet = routingTable.getInstances("TESTDB", "MASTER");
+    AssertJUnit.assertNotNull(instancesSet);
+    AssertJUnit.assertEquals(instancesSet.size(), 2);
+    instancesSet = routingTable.getInstances("TESTDB", "SLAVE");
+    AssertJUnit.assertNotNull(instancesSet);
+    AssertJUnit.assertEquals(instancesSet.size(), 2);
+    instancesArray = new InstanceConfig[instancesSet.size()];
+    instancesSet.toArray(instancesArray);
+    AssertJUnit.assertEquals(instancesArray[0].getHostName(), "localhost");
+    AssertJUnit.assertEquals(instancesArray[0].getPort(), "8900");
+    AssertJUnit.assertEquals(instancesArray[1].getHostName(), "localhost");
+    AssertJUnit.assertEquals(instancesArray[1].getPort(), "8901");
+  }
+
+  @Test()
+  public void testMultiThread() throws Exception
+  {
+    final RoutingTableProvider routingTable = new RoutingTableProvider();
+    List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+    ZNRecord record = new ZNRecord("TESTDB");
+    for (int i = 0; i < 1000; i++)
+    {
+      add(record, "TESTDB_" + i, "localhost_8900", "MASTER");
+    }
+    externalViewList.add(new ExternalView(record));
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    Callable<Boolean> runnable = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception
+      {
+
+        try
+        {
+          int count = 0;
+          while (count < 100)
+          {
+            List<InstanceConfig> instancesList = routingTable.getInstances("TESTDB", "TESTDB_0",
+                "MASTER");
+            AssertJUnit.assertEquals(instancesList.size(), 1);
+            // System.out.println(System.currentTimeMillis() + "-->"
+            // + instancesList.size());
+
+            Thread.sleep(5);
+
+            count++;
+          }
+        } catch (InterruptedException e)
+        {
+          // e.printStackTrace();
+        }
+        return true;
+      }
+    };
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    Future<Boolean> submit = executor.submit(runnable);
+    int count = 0;
+    while (count < 10)
+    {
+      try
+      {
+        Thread.sleep(10);
+      } catch (InterruptedException e)
+      {
+        e.printStackTrace();
+      }
+      routingTable.onExternalViewChange(externalViewList, changeContext);
+      count++;
+    }
+
+    Boolean result = submit.get(60, TimeUnit.SECONDS);
+    AssertJUnit.assertEquals(result, Boolean.TRUE);
+
+  }
+
+  private void add(ZNRecord record, String stateUnitKey, String instanceName, String state)
+  {
+    Map<String, String> stateUnitKeyMap = record.getMapField(stateUnitKey);
+    if (stateUnitKeyMap == null)
+    {
+      stateUnitKeyMap = new HashMap<String, String>();
+      record.setMapField(stateUnitKey, stateUnitKeyMap);
+    }
+    stateUnitKeyMap.put(instanceName, state);
+    record.setMapField(stateUnitKey, stateUnitKeyMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestSample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestSample.java b/helix-core/src/test/java/org/apache/helix/TestSample.java
new file mode 100644
index 0000000..0fe0099
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestSample.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author kgopalak
+ *
+ */
+
+public class TestSample
+{
+
+  @Test ()
+  public final void testCallbackHandler()
+  {
+    String path = null;
+    Object listener = null;
+    EventType[] eventTypes = null;
+
+  }
+
+  @BeforeMethod ()
+  public void asd()
+  {
+    System.out.println("In Set up");
+  }
+
+  @Test ()
+  public void testB()
+  {
+    System.out.println("In method testB");
+
+  }
+
+  @Test ()
+  public void testA()
+  {
+    System.out.println("In method testA");
+
+  }
+
+  @AfterMethod ()
+  public void sfds()
+  {
+    System.out.println("In tear down");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java b/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java
new file mode 100644
index 0000000..f0d1309
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.tools.IdealCalculatorByConsistentHashing;
+import org.apache.helix.tools.IdealStateCalculatorByRush;
+import org.apache.helix.tools.IdealStateCalculatorByShuffling;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestShuffledIdealState
+{
+  @Test ()
+  public void testInvocation() throws Exception
+  {
+    int partitions = 6, replicas = 2;
+    String dbName = "espressoDB1";
+    List<String> instanceNames = new ArrayList<String>();
+    instanceNames.add("localhost_1231");
+    instanceNames.add("localhost_1232");
+    instanceNames.add("localhost_1233");
+    instanceNames.add("localhost_1234");
+
+    ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    
+    ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(instanceNames, 1, partitions, replicas, dbName);
+
+    ZNRecord result3 = IdealCalculatorByConsistentHashing.calculateIdealState(instanceNames, partitions, replicas, dbName, new IdealCalculatorByConsistentHashing.FnvHash());
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result3, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result3, "SLAVE");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result3, "");
+    IdealCalculatorByConsistentHashing.printNodeOfflineOverhead(result3);
+
+    // System.out.println(result);
+    ObjectMapper mapper = new ObjectMapper();
+
+    // ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    StringWriter sw = new StringWriter();
+    try
+    {
+      mapper.writeValue(sw, result);
+      // System.out.println(sw.toString());
+
+      ZNRecord zn = mapper.readValue(new StringReader(sw.toString()),
+          ZNRecord.class);
+      System.out.println(result.toString());
+      System.out.println(zn.toString());
+      AssertJUnit.assertTrue(zn.toString().equalsIgnoreCase(result.toString()));
+      System.out.println();
+
+      sw= new StringWriter();
+      mapper.writeValue(sw, result2);
+
+      ZNRecord zn2 = mapper.readValue(new StringReader(sw.toString()),
+          ZNRecord.class);
+      System.out.println(result2.toString());
+      System.out.println(zn2.toString());
+      AssertJUnit.assertTrue(zn2.toString().equalsIgnoreCase(result2.toString()));
+
+      sw= new StringWriter();
+      mapper.writeValue(sw, result3);
+      System.out.println();
+
+      ZNRecord zn3 = mapper.readValue(new StringReader(sw.toString()),
+          ZNRecord.class);
+      System.out.println(result3.toString());
+      System.out.println(zn3.toString());
+      AssertJUnit.assertTrue(zn3.toString().equalsIgnoreCase(result3.toString()));
+      System.out.println();
+
+    } catch (JsonGenerationException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (JsonMappingException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (IOException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+  
+  @Test
+  public void testShuffledIdealState()
+  {
+    // partitions is larger than nodes
+    int partitions = 6, replicas = 2, instances = 4;
+    String dbName = "espressoDB1";
+    List<String> instanceNames = new ArrayList<String>();
+    instanceNames.add("localhost_1231");
+    instanceNames.add("localhost_1232");
+    instanceNames.add("localhost_1233");
+    instanceNames.add("localhost_1234");
+
+    ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    Assert.assertTrue(verify(result));
+    
+    // partition is less than nodes
+    instanceNames.clear();
+    partitions = 4; 
+    replicas = 3;
+    instances = 7;
+    
+    for(int i = 0; i<instances; i++)
+    {
+      instanceNames.add("localhost_" + (1231 + i));
+    }
+    result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    Assert.assertTrue(verify(result));
+    
+    // partitions is multiple of nodes
+    instanceNames.clear();
+    partitions = 14; 
+    replicas = 3;
+    instances = 7;
+    
+    for(int i = 0; i<instances; i++)
+    {
+      instanceNames.add("localhost_" + (1231 + i));
+    }
+    result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    Assert.assertTrue(verify(result));
+    
+    // nodes are multiple of partitions
+    instanceNames.clear();
+    partitions = 4; 
+    replicas = 3;
+    instances = 8;
+    
+    for(int i = 0; i<instances; i++)
+    {
+      instanceNames.add("localhost_" + (1231 + i));
+    }
+    result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    Assert.assertTrue(verify(result));
+    
+    // nodes are multiple of partitions
+    instanceNames.clear();
+    partitions = 4; 
+    replicas = 3;
+    instances = 12;
+    
+    for(int i = 0; i<instances; i++)
+    {
+      instanceNames.add("localhost_" + (1231 + i));
+    }
+    result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    Assert.assertTrue(verify(result));
+    
+    // Just fits
+    instanceNames.clear();
+    partitions = 4; 
+    replicas = 2;
+    instances = 12;
+    
+    for(int i = 0; i<instances; i++)
+    {
+      instanceNames.add("localhost_" + (1231 + i));
+    }
+    result = IdealStateCalculatorByShuffling.calculateIdealState(
+        instanceNames, partitions, replicas, dbName);
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+    IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+    Assert.assertTrue(verify(result));
+  }
+  
+  boolean verify(ZNRecord result)
+  {
+    Map<String, Integer> masterPartitionCounts = new HashMap<String, Integer>();
+    Map<String, Integer> slavePartitionCounts = new HashMap<String, Integer>();
+    
+    for(String key : result.getMapFields().keySet())
+    {
+      Map<String, String> mapField = result.getMapField(key);
+      int masterCount = 0;
+      for(String host: mapField.keySet())
+      {
+        if(mapField.get(host).equals("MASTER"))
+        {
+          Assert.assertTrue(masterCount == 0);
+          masterCount ++;
+          if(!masterPartitionCounts.containsKey(host))
+          {
+            masterPartitionCounts.put(host, 0);
+          }
+          else
+          {
+            masterPartitionCounts.put(host, masterPartitionCounts.get(host) + 1);
+          }
+        }
+        else
+        {
+          if(!slavePartitionCounts.containsKey(host))
+          {
+            slavePartitionCounts.put(host, 0);
+          }
+          else
+          {
+            slavePartitionCounts.put(host, slavePartitionCounts.get(host) + 1);
+          }
+        }
+      }
+    }
+    
+    List<Integer> masterCounts = new ArrayList<Integer>();
+    List<Integer> slaveCounts = new ArrayList<Integer>();
+    masterCounts.addAll(masterPartitionCounts.values());
+    slaveCounts.addAll(slavePartitionCounts.values());
+    Collections.sort(masterCounts);
+    Collections.sort(slaveCounts);
+    
+    Assert.assertTrue(masterCounts.get(masterCounts.size() - 1 ) - masterCounts.get(0) <= 1);
+
+    Assert.assertTrue(slaveCounts.get(slaveCounts.size() - 1 ) - slaveCounts.get(0) <= 2);
+    return true;
+  }
+}