You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[33/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
new file mode 100644
index 0000000..627f3d5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
@@ -0,0 +1,179 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDistributedCMMain extends ZkIntegrationTestBase
+{
+
+  @Test
+  public void testDistributedCMMain() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterNamePrefix = className + "_" + methodName;
+    final int n = 5;
+    final int clusterNb = 10;
+
+    System.out.println("START " + clusterNamePrefix + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // setup 10 clusters
+    for (int i = 0; i < clusterNb; i++)
+    {
+      String clusterName = clusterNamePrefix + "0_" + i;
+      String participantName = "localhost" + i;
+      String resourceName = "TestDB" + i;
+      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                              participantName, // participant name prefix
+                              resourceName, // resource name prefix
+                              1, // resources
+                              8, // partitions per resource
+                              n, // number of nodes
+                              3, // replicas
+                              "MasterSlave",
+                              true); // do rebalance
+    }
+
+    // setup controller cluster
+    final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+    TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0, // controller
+                                                                           // port
+                            "controller", // participant name prefix
+                            clusterNamePrefix, // resource name prefix
+                            1, // resources
+                            clusterNb, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "LeaderStandby",
+                            true); // do rebalance
+
+    // start distributed cluster controllers
+    ClusterController[] controllers = new ClusterController[n + n];
+    for (int i = 0; i < n; i++)
+    {
+      controllers[i] =
+          new ClusterController(controllerClusterName,
+                                "controller_" + i,
+                                ZK_ADDR,
+                                HelixControllerMain.DISTRIBUTED.toString());
+      controllers[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                      controllerClusterName),
+                                                30000);
+    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+    // start first cluster
+    MockParticipant[] participants = new MockParticipant[n];
+    final String firstClusterName = clusterNamePrefix + "0_0";
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost0_" + (12918 + i);
+      participants[i] =
+          new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 firstClusterName));
+    Assert.assertTrue(result, "first cluster NOT in ideal state");
+    
+    
+    // add more controllers to controller cluster
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    for (int i = 0; i < n; i++)
+    {
+      String controller = "controller:" + (n + i);
+      setupTool.addInstanceToCluster(controllerClusterName, controller);
+    }
+    setupTool.rebalanceStorageCluster(controllerClusterName, clusterNamePrefix + "0", 6);
+    for (int i = n; i < 2 * n; i++)
+    {
+      controllers[i] =
+          new ClusterController(controllerClusterName,
+                                "controller_" + i,
+                                ZK_ADDR,
+                                HelixControllerMain.DISTRIBUTED.toString());
+      controllers[i].syncStart();
+    }
+
+    
+    // verify controller cluster
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                      controllerClusterName));
+    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+    
+    // verify first cluster
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 firstClusterName));
+    Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+    
+    // stop controller_0-5
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    for (int i = 0; i < n; i++)
+    {
+      LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+      String leaderName = leader.getId();
+      int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+      controllers[j].syncStop();
+      
+      result =
+          ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                        controllerClusterName));
+      Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+      
+      result =
+          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                   firstClusterName));
+      Assert.assertTrue(result, "first cluster NOT in ideal state");
+    }
+
+    
+    // clean up
+    // wait for all zk callbacks done
+    System.out.println("Cleaning up...");
+    Thread.sleep(2000);
+    for (int i = 0; i < 5; i++)
+    {
+      result =
+          ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                        controllerClusterName));
+      controllers[i].syncStop();
+    }
+    
+    // Thread.sleep(2000);
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterNamePrefix + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
new file mode 100644
index 0000000..ffb17b0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
@@ -0,0 +1,148 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDistributedClusterController extends ZkIntegrationTestBase
+{
+
+  @Test
+  public void testDistributedClusterController() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterNamePrefix = className + "_" + methodName;
+    final int n = 5;
+    final int clusterNb = 10;
+
+    System.out.println("START " + clusterNamePrefix + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // setup 10 clusters
+    for (int i = 0; i < clusterNb; i++)
+    {
+      String clusterName = clusterNamePrefix + "0_" + i;
+      String participantName = "localhost" + i;
+      String resourceName = "TestDB" + i;
+      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                              participantName, // participant name prefix
+                              resourceName, // resource name prefix
+                              1, // resources
+                              8, // partitions per resource
+                              n, // number of nodes
+                              3, // replicas
+                              "MasterSlave",
+                              true); // do rebalance
+    }
+
+    // setup controller cluster
+    final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+    TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0, // controller
+                                                                           // port
+                            "controller", // participant name prefix
+                            clusterNamePrefix, // resource name prefix
+                            1, // resources
+                            clusterNb, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "LeaderStandby",
+                            true); // do rebalance
+
+    // start distributed cluster controllers
+    ClusterController[] controllers = new ClusterController[n];
+    for (int i = 0; i < n; i++)
+    {
+      controllers[i] =
+          new ClusterController(controllerClusterName,
+                                "controller_" + i,
+                                ZK_ADDR,
+                                HelixControllerMain.DISTRIBUTED.toString());
+      controllers[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                      controllerClusterName),
+                                                30000);
+    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+    // start first cluster
+    MockParticipant[] participants = new MockParticipant[n];
+    final String firstClusterName = clusterNamePrefix + "0_0";
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost0_" + (12918 + i);
+      participants[i] =
+          new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 firstClusterName));
+    Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+    
+    // stop current leader in controller cluster
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    String leaderName = leader.getId();
+    int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+    controllers[j].syncStop();
+    
+    
+    // setup the second cluster
+    MockParticipant[] participants2 = new MockParticipant[n];
+    final String secondClusterName = clusterNamePrefix + "0_1";
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost1_" + (12918 + i);
+      participants2[i] =
+          new MockParticipant(secondClusterName, instanceName, ZK_ADDR, null);
+      participants2[i].syncStart();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 secondClusterName));
+    Assert.assertTrue(result, "second cluster NOT in ideal state");
+
+    // clean up
+    // wait for all zk callbacks done
+    System.out.println("Cleaning up...");
+    Thread.sleep(1000);
+    for (int i = 0; i < 5; i++)
+    {
+      result =
+          ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                        controllerClusterName));
+      controllers[i].syncStop();
+    }
+
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterNamePrefix + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
new file mode 100644
index 0000000..cd09d92
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
@@ -0,0 +1,519 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.tools.TestCommand;
+import org.apache.helix.tools.TestExecutor;
+import org.apache.helix.tools.TestTrigger;
+import org.apache.helix.tools.ZnodeOpArg;
+import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.helix.tools.TestCommand.NodeOpArg;
+import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+
+public class TestDriver
+{
+  private static Logger LOG = Logger.getLogger(TestDriver.class);
+  private static final String ZK_ADDR = ZkIntegrationTestBase.ZK_ADDR;
+
+  // private static final String CLUSTER_PREFIX = "TestDriver";
+  private static final String STATE_MODEL = "MasterSlave";
+  private static final String TEST_DB_PREFIX = "TestDB";
+  private static final int START_PORT = 12918;
+  private static final String CONTROLLER_PREFIX = "controller";
+  private static final String PARTICIPANT_PREFIX = "localhost";
+  private static final Random RANDOM = new Random();
+  private static final PropertyJsonSerializer<ZNRecord> SERIALIZER = new PropertyJsonSerializer<ZNRecord>(
+      ZNRecord.class);
+
+  private static final Map<String, TestInfo> _testInfoMap = new ConcurrentHashMap<String, TestInfo>();
+
+  public static class TestInfo
+  {
+    public final ZkClient _zkClient;
+    public final String _clusterName;
+    public final int _numDb;
+    public final int _numPartitionsPerDb;
+    public final int _numNode;
+    public final int _replica;
+
+    // public final Map<String, ZNRecord> _idealStateMap = new
+    // ConcurrentHashMap<String, ZNRecord>();
+    public final Map<String, StartCMResult> _startCMResultMap = new ConcurrentHashMap<String, StartCMResult>();
+
+    public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
+        int numNode, int replica)
+    {
+      this._clusterName = clusterName;
+      this._zkClient = zkClient;
+      this._numDb = numDb;
+      this._numPartitionsPerDb = numPartitionsPerDb;
+      this._numNode = numNode;
+      this._replica = replica;
+    }
+  }
+
+  public static TestInfo getTestInfo(String uniqClusterName)
+  {
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+      String errMsg = "Cluster hasn't been setup for " + uniqClusterName;
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+    return testInfo;
+  }
+
+  public static void setupClusterWithoutRebalance(String uniqClusterName, String zkAddr,
+      int numResources, int numPartitionsPerResource, int numInstances, int replica)
+      throws Exception
+  {
+    setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
+        replica, false);
+  }
+
+  public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
+      int numPartitionsPerResource, int numInstances, int replica) throws Exception
+  {
+    setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
+        replica, true);
+  }
+
+  // public static void setupCluster(String uniqTestName, ZkClient zkClient, int
+  // numDb,
+  // int numPartitionPerDb, int numNodes, int replica, boolean doRebalance)
+  // throws Exception
+  public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
+      int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
+      throws Exception
+  {
+    ZkClient zkClient = new ZkClient(zkAddr);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    // String clusterName = CLUSTER_PREFIX + "_" + uniqClusterName;
+    String clusterName = uniqClusterName;
+    if (zkClient.exists("/" + clusterName))
+    {
+      LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName
+          + " is not unique or test has been run without cleaning up zk; deleting it");
+      zkClient.deleteRecursive("/" + clusterName);
+    }
+
+    if (_testInfoMap.containsKey(uniqClusterName))
+    {
+      LOG.warn("test info already exists:" + uniqClusterName
+          + " is not unique or test has been run without cleaning up test info map; removing it");
+      _testInfoMap.remove(uniqClusterName);
+    }
+    TestInfo testInfo = new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource,
+        numInstances, replica);
+    _testInfoMap.put(uniqClusterName, testInfo);
+
+    ClusterSetup setupTool = new ClusterSetup(zkAddr);
+    setupTool.addCluster(clusterName, true);
+
+    for (int i = 0; i < numInstances; i++)
+    {
+      int port = START_PORT + i;
+      setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + ":" + port);
+    }
+
+    for (int i = 0; i < numResources; i++)
+    {
+      String dbName = TEST_DB_PREFIX + i;
+      setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource,
+          STATE_MODEL);
+      if (doRebalance)
+      {
+        setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+
+        // String idealStatePath = "/" + clusterName + "/" +
+        // PropertyType.IDEALSTATES.toString() + "/"
+        // + dbName;
+        // ZNRecord idealState = zkClient.<ZNRecord> readData(idealStatePath);
+        // testInfo._idealStateMap.put(dbName, idealState);
+      }
+    }
+  }
+
+  /**
+   * starting a dummy participant with a given id
+   *
+   * @param uniqueTestName
+   * @param instanceId
+   */
+  public static void startDummyParticipant(String uniqClusterName, int instanceId) throws Exception
+  {
+    startDummyParticipants(uniqClusterName, new int[] { instanceId });
+  }
+
+  public static void startDummyParticipants(String uniqClusterName, int[] instanceIds)
+      throws Exception
+  {
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+      String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+    String clusterName = testInfo._clusterName;
+
+    for (int id : instanceIds)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
+
+      if (testInfo._startCMResultMap.containsKey(instanceName))
+      {
+        LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
+      } else
+      {
+        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
+        testInfo._startCMResultMap.put(instanceName, result);
+        // testInfo._instanceStarted.countDown();
+      }
+    }
+  }
+
+  public static void startController(String uniqClusterName) throws Exception
+  {
+    startController(uniqClusterName, new int[] { 0 });
+  }
+
+  public static void startController(String uniqClusterName, int[] nodeIds) throws Exception
+  {
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+      String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+    String clusterName = testInfo._clusterName;
+
+    for (int id : nodeIds)
+    {
+      String controllerName = CONTROLLER_PREFIX + "_" + id;
+      if (testInfo._startCMResultMap.containsKey(controllerName))
+      {
+        LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
+      } else
+      {
+        StartCMResult result = TestHelper.startController(clusterName, controllerName, ZK_ADDR,
+            HelixControllerMain.STANDALONE);
+        testInfo._startCMResultMap.put(controllerName, result);
+      }
+    }
+  }
+
+  public static void verifyCluster(String uniqClusterName, long beginTime, long timeout)
+      throws Exception
+  {
+    Thread.sleep(beginTime);
+
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+      String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+    String clusterName = testInfo._clusterName;
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName), timeout);
+    Assert.assertTrue(result);
+  }
+
+  public static void stopCluster(String uniqClusterName) throws Exception
+  {
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+      String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+      throw new IllegalArgumentException(errMsg);
+    }
+    TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
+
+    // stop controller first
+    for (Iterator<Entry<String, StartCMResult>> it = testInfo._startCMResultMap.entrySet()
+        .iterator(); it.hasNext();)
+    {
+      Map.Entry<String, StartCMResult> entry = it.next();
+      String instanceName = entry.getKey();
+      if (instanceName.startsWith(CONTROLLER_PREFIX))
+      {
+        it.remove();
+        HelixManager manager = entry.getValue()._manager;
+        manager.disconnect();
+        Thread thread = entry.getValue()._thread;
+        thread.interrupt();
+      }
+    }
+
+    Thread.sleep(1000);
+
+    // stop the rest
+    for (Map.Entry<String, StartCMResult> entry : testInfo._startCMResultMap.entrySet())
+    {
+      HelixManager manager = entry.getValue()._manager;
+      manager.disconnect();
+      Thread thread = entry.getValue()._thread;
+      thread.interrupt();
+    }
+
+    testInfo._zkClient.close();
+  }
+
+  public static void stopDummyParticipant(String uniqClusterName, long beginTime, int instanceId)
+      throws Exception
+  {
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+
+      String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+      throw new Exception(errMsg);
+    }
+
+    TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+    // String clusterName = testInfo._clusterName;
+
+    String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
+    StartCMResult result = testInfo._startCMResultMap.remove(failHost);
+
+    // TODO need sync
+    if (result == null || result._manager == null || result._thread == null)
+    {
+      String errMsg = "Dummy participant:" + failHost + " seems not running";
+      LOG.error(errMsg);
+    } else
+    {
+      // System.err.println("try to stop participant: " +
+      // result._manager.getInstanceName());
+      NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
+      TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
+      List<TestCommand> commandList = new ArrayList<TestCommand>();
+      commandList.add(command);
+      TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+    }
+  }
+
+  public static void setIdealState(String uniqClusterName, long beginTime, int percentage)
+      throws Exception
+  {
+    if (!_testInfoMap.containsKey(uniqClusterName))
+    {
+      String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+      throw new IllegalArgumentException(errMsg);
+    }
+    TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+    String clusterName = testInfo._clusterName;
+    List<String> instanceNames = new ArrayList<String>();
+
+    for (int i = 0; i < testInfo._numNode; i++)
+    {
+      int port = START_PORT + i;
+      instanceNames.add(PARTICIPANT_PREFIX + "_" + port);
+    }
+
+    List<TestCommand> commandList = new ArrayList<TestCommand>();
+    for (int i = 0; i < testInfo._numDb; i++)
+    {
+      String dbName = TEST_DB_PREFIX + i;
+      ZNRecord destIS = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames,
+          testInfo._numPartitionsPerDb, testInfo._replica - 1, dbName, "MASTER", "SLAVE");
+      // destIS.setId(dbName);
+      destIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
+          IdealStateModeProperty.CUSTOMIZED.toString());
+      destIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+          Integer.toString(testInfo._numPartitionsPerDb));
+      destIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
+      destIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
+      // String idealStatePath = "/" + clusterName + "/" +
+      // PropertyType.IDEALSTATES.toString() + "/"
+      // + TEST_DB_PREFIX + i;
+      ZNRecord initIS = new ZNRecord(dbName); // _zkClient.<ZNRecord>
+                                              // readData(idealStatePath);
+      initIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
+          IdealStateModeProperty.CUSTOMIZED.toString());
+      initIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+          Integer.toString(testInfo._numPartitionsPerDb));
+      initIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
+      initIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
+      int totalStep = calcuateNumTransitions(initIS, destIS);
+      // LOG.info("initIS:" + initIS);
+      // LOG.info("destIS:" + destIS);
+      // LOG.info("totalSteps from initIS to destIS:" + totalStep);
+      // System.out.println("initIS:" + initIS);
+      // System.out.println("destIS:" + destIS);
+
+      ZNRecord nextIS;
+      int step = totalStep * percentage / 100;
+      System.out.println("Resource:" + dbName + ", totalSteps from initIS to destIS:" + totalStep
+          + ", walk " + step + " steps(" + percentage + "%)");
+      nextIS = nextIdealState(initIS, destIS, step);
+      // testInfo._idealStateMap.put(dbName, nextIS);
+      String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName,
+          TEST_DB_PREFIX + i);
+      ZnodeOpArg arg = new ZnodeOpArg(idealStatePath, ZnodePropertyType.ZNODE, "+", nextIS);
+      TestCommand command = new TestCommand(CommandType.MODIFY, new TestTrigger(beginTime), arg);
+      commandList.add(command);
+    }
+
+    TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+
+  }
+
+  private static List<String[]> findAllUnfinishPairs(ZNRecord cur, ZNRecord dest)
+  {
+    // find all (host, resource) pairs that haven't reached destination state
+    List<String[]> list = new ArrayList<String[]>();
+    Map<String, Map<String, String>> map = dest.getMapFields();
+    for (Map.Entry<String, Map<String, String>> entry : map.entrySet())
+    {
+      String partitionName = entry.getKey();
+      Map<String, String> hostMap = entry.getValue();
+      for (Map.Entry<String, String> hostEntry : hostMap.entrySet())
+      {
+        String host = hostEntry.getKey();
+        String destState = hostEntry.getValue();
+        Map<String, String> curHostMap = cur.getMapField(partitionName);
+
+        String curState = null;
+        if (curHostMap != null)
+        {
+          curState = curHostMap.get(host);
+        }
+
+        String[] pair = new String[3];
+        if (curState == null)
+        {
+          if (destState.equalsIgnoreCase("SLAVE"))
+          {
+            pair[0] = new String(partitionName);
+            pair[1] = new String(host);
+            pair[2] = new String("1"); // number of transitions required
+            list.add(pair);
+          } else if (destState.equalsIgnoreCase("MASTER"))
+          {
+            pair[0] = new String(partitionName);
+            pair[1] = new String(host);
+            pair[2] = new String("2"); // number of transitions required
+            list.add(pair);
+          }
+        } else
+        {
+          if (curState.equalsIgnoreCase("SLAVE") && destState.equalsIgnoreCase("MASTER"))
+          {
+            pair[0] = new String(partitionName);
+            pair[1] = new String(host);
+            pair[2] = new String("1"); // number of transitions required
+            list.add(pair);
+          }
+        }
+      }
+    }
+    return list;
+  }
+
+  private static int calcuateNumTransitions(ZNRecord start, ZNRecord end)
+  {
+    int totalSteps = 0;
+    List<String[]> list = findAllUnfinishPairs(start, end);
+    for (String[] pair : list)
+    {
+      totalSteps += Integer.parseInt(pair[2]);
+    }
+    return totalSteps;
+  }
+
+  private static ZNRecord nextIdealState(final ZNRecord cur, final ZNRecord dest, final int steps)
+      throws PropertyStoreException
+  {
+    // get a deep copy
+    ZNRecord next = SERIALIZER.deserialize(SERIALIZER.serialize(cur));
+    List<String[]> list = findAllUnfinishPairs(cur, dest);
+
+    // randomly pick up pairs that haven't reached destination state and
+    // progress
+    for (int i = 0; i < steps; i++)
+    {
+      int randomInt = RANDOM.nextInt(list.size());
+      String[] pair = list.get(randomInt);
+      String curState = null;
+      Map<String, String> curHostMap = next.getMapField(pair[0]);
+      if (curHostMap != null)
+      {
+        curState = curHostMap.get(pair[1]);
+      }
+      final String destState = dest.getMapField(pair[0]).get(pair[1]);
+
+      // TODO generalize it using state-model
+      if (curState == null && destState != null)
+      {
+        Map<String, String> hostMap = next.getMapField(pair[0]);
+        if (hostMap == null)
+        {
+          hostMap = new HashMap<String, String>();
+        }
+        hostMap.put(pair[1], "SLAVE");
+        next.setMapField(pair[0], hostMap);
+      } else if (curState.equalsIgnoreCase("SLAVE") && destState != null
+          && destState.equalsIgnoreCase("MASTER"))
+      {
+        next.getMapField(pair[0]).put(pair[1], "MASTER");
+      } else
+      {
+        LOG.error("fail to calculate the next ideal state");
+      }
+      curState = next.getMapField(pair[0]).get(pair[1]);
+      if (curState != null && curState.equalsIgnoreCase(destState))
+      {
+        list.remove(randomInt);
+      }
+    }
+
+    LOG.info("nextIS:" + next);
+    return next;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
new file mode 100644
index 0000000..942ba28
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -0,0 +1,200 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDrop extends ZkIntegrationTestBase
+{
+  @Test
+  public void testDropErrorPartition() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    Map<String, Set<String>> errTransitions = new HashMap<String, Set<String>>();
+    errTransitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+    errTransitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errTransitions));
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      }
+      participants[i].syncStart();
+    }
+
+    Map<String, Map<String, String>> errStateMap =
+        new HashMap<String, Map<String, String>>();
+    errStateMap.put("TestDB0", new HashMap<String, String>());
+    errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+    errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName,
+                                                                                 errStateMap));
+    Assert.assertTrue(result);
+    
+    // drop resource containing error partitions should not change partitions in error state
+    ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZK_ADDR, "--dropResource", clusterName, "TestDB0"});
+
+
+    // make sure TestDB0_4 and TestDB0_8 partitions stay in ERROR state
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName,
+                                                                                 errStateMap));
+    Assert.assertTrue(result);
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis())); 
+  }
+  
+  @Test
+  public void testDropSchemataResource() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // add schemata resource group
+    String command = "--zkSvr " + ZK_ADDR +" --addResource " + clusterName + 
+        " schemata 1 STORAGE_DEFAULT_SM_SCHEMATA";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    command = "--zkSvr " + ZK_ADDR +" --rebalance " + clusterName + 
+        " schemata 1";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // drop schemata resource group
+    System.out.println("Dropping schemata resource group...");
+    command = "--zkSvr " + ZK_ADDR +" --dropResource " + clusterName + 
+        " schemata";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // make sure schemata external view is empty
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    ExternalView extView = accessor.getProperty(keyBuilder.externalView("schemata"));
+    Assert.assertEquals(extView.getPartitionSet().size(), 0, "schemata externalView should be empty but was \"" + extView + "\"");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis())); 
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
new file mode 100644
index 0000000..6f91df5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  @Test()
+  public void testDropResource() throws Exception
+  {
+    // add a resource to be dropped
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 6, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 3);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+                                 CLUSTER_NAME,
+                                 "MyDB",
+                                 TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+                                                          "localhost_12920", "localhost_12921",
+                                                          "localhost_12922"),
+                                 ZK_ADDR);
+  }
+  
+  @Test()
+  public void testDropResourceWhileNodeDead() throws Exception
+  {
+ // add a resource to be dropped
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB2", 16, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
+    
+    boolean verifyResult = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(verifyResult);
+
+    String hostToKill = "localhost_12920";
+    
+    _startCMResultMap.get(hostToKill)._manager.disconnect();
+    Thread.sleep(1000);
+    _startCMResultMap.get(hostToKill)._thread.interrupt();
+    
+    String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+        CLUSTER_NAME,
+        "MyDB2",
+        TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+                                 /*"localhost_12920",*/ "localhost_12921",
+                                 "localhost_12922"),
+        ZK_ADDR);
+    
+    StartCMResult result =
+        TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, hostToKill);
+    _startCMResultMap.put(hostToKill, result);
+    
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+        CLUSTER_NAME,
+        "MyDB2",
+        TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+                                 "localhost_12920", "localhost_12921",
+                                 "localhost_12922"),
+        ZK_ADDR);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java b/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java
new file mode 100644
index 0000000..c56bdc2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicFileClusterManager extends FileCMTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestDynamicFileClusterManager.class);
+
+  @Test()
+  public void testDynamicFileClusterManager()
+  throws Exception
+  {
+    System.out.println("RUN testDynamicFileClusterManager() at " + new Date(System.currentTimeMillis()));
+
+    // add a new db
+    _mgmtTool.addResource(CLUSTER_NAME, "MyDB", 6, STATE_MODEL);
+    rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewFileVerifier(ROOT_PATH, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    // drop db
+    _mgmtTool.dropResource(CLUSTER_NAME, "MyDB");
+
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateFile",
+                                 CLUSTER_NAME,
+                                 "MyDB",
+                                 TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+                                                          "localhost_12920", "localhost_12921",
+                                                          "localhost_12922"),
+                                                          _fileStore);
+
+    System.out.println("STOP testDynamicFileClusterManager() at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
new file mode 100644
index 0000000..926d0d5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
@@ -0,0 +1,159 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase
+{
+  static
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+  }
+
+  class EnablePartitionTransition extends MockTransition
+  {
+    int slaveToOfflineCnt = 0;
+    int offlineToSlave = 0;
+
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+    {
+      HelixManager manager = context.getManager();
+      String clusterName = manager.getClusterName();
+
+      String instance = message.getTgtName();
+      String partitionName = message.getPartitionName();
+      String fromState = message.getFromState();
+      String toState = message.getToState();
+      if (instance.equals("localhost_12919") && partitionName.equals("TestDB0_0"))
+      {
+        if (fromState.equals("SLAVE") && toState.equals("OFFLINE"))
+        {
+          slaveToOfflineCnt++;
+
+          try
+          {
+            String command =
+                "--zkSvr " + ZK_ADDR + " --enablePartition true " + clusterName
+                    + " localhost_12919 TestDB0 TestDB0_0";
+
+            ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+          }
+          catch (Exception e)
+          {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+
+        }
+        else if (slaveToOfflineCnt > 0 && fromState.equals("OFFLINE") && toState.equals("SLAVE"))
+        {
+          offlineToSlave++;
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void testEnablePartitionDuringDisable() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    EnablePartitionTransition transition = new EnablePartitionTransition();
+    MockParticipant[] participants = new MockParticipant[5];
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (instanceName.equals("localhost_12919"))
+      {
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                transition);
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      }
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // disable partitions
+    String command =
+        "--zkSvr " + ZK_ADDR + " --enablePartition false " + clusterName
+            + " localhost_12919 TestDB0 TestDB0_0";
+
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    // ensure we get 1 slaveToOffline and 1 offlineToSlave after disable partition
+    long startT = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startT < 1000)  // retry in 1s
+    {
+      if (transition.slaveToOfflineCnt > 0 && transition.offlineToSlave > 0)
+      {
+        break;
+      }
+
+      Thread.sleep(10);
+    }
+    long endT = System.currentTimeMillis();
+    System.out.println("1 disable and re-enable took: " + (endT - startT) + "ms");
+    Assert.assertEquals(transition.slaveToOfflineCnt, 1, "should get 1 slaveToOffline transition");
+    Assert.assertEquals(transition.offlineToSlave, 1, "should get 1 offlineToSlave transition");
+
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
new file mode 100644
index 0000000..3eb01dd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestErrorPartition extends ZkIntegrationTestBase
+{
+  @Test()
+  public void testErrorPartition() throws Exception
+  {
+    String clusterName = getShortClassName();
+    MockParticipant[] participants = new MockParticipant[5];
+
+    System.out.println("START testErrorPartition() at "
+        + new Date(System.currentTimeMillis()));
+    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+
+    TestHelper.setupCluster(clusterName,
+                            ZK_ADDR,
+                            12918,
+                            "localhost",
+                            "TestDB",
+                            1,
+                            10,
+                            5,
+                            3,
+                            "MasterSlave",
+                            true);
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+        {
+          {
+            put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+          }
+        };
+        participants[i] =
+            new MockParticipant(clusterName,
+                                instanceName,
+                                ZK_ADDR,
+                                new ErrTransition(errPartitions));
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      }
+      participants[i].syncStart();
+      // new Thread(participants[i]).start();
+    }
+
+    Map<String, Map<String, String>> errStates =
+        new HashMap<String, Map<String, String>>();
+    errStates.put("TestDB0", new HashMap<String, String>());
+    errStates.get("TestDB0").put("TestDB0_4", "localhost_12918");
+    boolean result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName,
+                                                                                          errStates));
+    Assert.assertTrue(result);
+
+    Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>()
+    {
+      {
+        put("TestDB0_4", TestHelper.setOf("localhost_12918"));
+      }
+    };
+
+    // verify "TestDB0_0", "localhost_12918" is in ERROR state
+    TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
+
+    // disable a partition on a node with error state
+    tool.enablePartition(false, clusterName, "localhost_12918", "TestDB0", Arrays.asList("TestDB0_4"));
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName,
+                                                                                          errStates));
+    Assert.assertTrue(result);
+
+    TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
+
+    // disable a node with error state
+    tool.enableInstance(clusterName, "localhost_12918", false);
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName,
+                                                                                          errStates));
+    Assert.assertTrue(result);
+
+    // make sure after restart stale ERROR state is gone
+    tool.enablePartition(true, clusterName, "localhost_12918", "TestDB0", Arrays.asList("TestDB0_4"));
+    tool.enableInstance(clusterName, "localhost_12918", true);
+
+    participants[0].syncStop();
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName));
+    Assert.assertTrue(result);
+    participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR);
+    new Thread(participants[0]).start();
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName));
+    Assert.assertTrue(result);
+
+    System.out.println("END testErrorPartition() at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java
new file mode 100644
index 0000000..a43cd34
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration;
+
+import java.util.Map;
+
+import org.apache.helix.TestEspressoStorageClusterIdealState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestExpandCluster extends ZkStandAloneCMTestBase
+{
+@Test
+  public void testExpandCluster() throws Exception
+  {
+    String DB2 = "TestDB2";
+    int partitions = 100;
+    int replica = 3;
+    _setupTool.addResourceToCluster(CLUSTER_NAME, DB2, partitions, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, DB2, replica, "keyX");
+    
+    String DB3 = "TestDB3";
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, DB3, partitions, STATE_MODEL);
+    
+    IdealState testDB0 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    IdealState testDB2 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB2);
+    IdealState testDB3 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB3);
+    
+    for (int i = 0; i < 5; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (27960 + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    String command = "-zkSvr localhost:2183 -expandCluster " + CLUSTER_NAME;
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    
+    IdealState testDB0_1 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    IdealState testDB2_1 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB2);
+    IdealState testDB3_1 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB3);
+    
+    Map<String, Object> resultOld2 = ClusterSetup.buildInternalIdealState(testDB2);
+    Map<String, Object> result2 = ClusterSetup.buildInternalIdealState(testDB2_1);
+    
+    TestEspressoStorageClusterIdealState.Verify(result2, partitions, replica - 1);
+
+    Double masterKeepRatio = 0.0, slaveKeepRatio = 0.0;
+    double[] result = TestEspressoStorageClusterIdealState.compareResult(resultOld2, result2);
+    masterKeepRatio = result[0];
+    slaveKeepRatio = result[1];
+    Assert.assertTrue(masterKeepRatio > 0.49 && masterKeepRatio < 0.51);
+    
+    Assert.assertTrue(testDB3_1.getRecord().getListFields().size() == 0);
+    
+    // partitions should stay as same
+    Assert.assertTrue(testDB0_1.getRecord().getListFields().keySet().containsAll(testDB0.getRecord().getListFields().keySet()));
+    Assert.assertTrue(testDB0_1.getRecord().getListFields().size() == testDB0.getRecord().getListFields().size());
+    Assert.assertTrue(testDB2_1.getRecord().getMapFields().keySet().containsAll(testDB2.getRecord().getMapFields().keySet()));
+    Assert.assertTrue(testDB2_1.getRecord().getMapFields().size() == testDB2.getRecord().getMapFields().size());
+    Assert.assertTrue(testDB3_1.getRecord().getMapFields().keySet().containsAll(testDB3.getRecord().getMapFields().keySet()));
+    Assert.assertTrue(testDB3_1.getRecord().getMapFields().size() == testDB3.getRecord().getMapFields().size());
+    
+    Map<String, Object> resultOld = ClusterSetup.buildInternalIdealState(testDB0);
+    Map<String, Object> resultNew = ClusterSetup.buildInternalIdealState(testDB0_1);
+    
+    result = TestEspressoStorageClusterIdealState.compareResult(resultOld, resultNew);
+    masterKeepRatio = result[0];
+    slaveKeepRatio = result[1];
+    Assert.assertTrue(masterKeepRatio > 0.49 && masterKeepRatio < 0.51);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java b/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java
new file mode 100644
index 0000000..3a4fcbe
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java
@@ -0,0 +1,36 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.ExternalCommand;
+import org.apache.helix.ScriptTestHelper;
+import org.apache.helix.TestHelper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestExternalCmd
+{
+
+  @Test
+  public void testExternalCmd() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+    
+    System.out.println("START " + testName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    ExternalCommand cmd = ScriptTestHelper.runCommandLineTest("dummy.sh");
+    String output = cmd.getStringOutput("UTF8");
+    int idx = output.indexOf("this is a dummy test for verify ExternalCommand works");
+    Assert.assertNotSame(idx, -1);
+    
+    System.out.println("END " + testName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
new file mode 100644
index 0000000..63bd388
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
@@ -0,0 +1,89 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestExternalViewUpdates extends ZkIntegrationTestBase
+{
+  @Test
+  public void testExternalViewUpdates() throws Exception
+  {
+    System.out.println("START testExternalViewUpdates at "
+        + new Date(System.currentTimeMillis()));
+
+    String clusterName = getShortClassName();
+    MockParticipant[] participants = new MockParticipant[5];
+    int resourceNb = 10;
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            resourceNb, // resources
+                            1, // partitions per resource
+                            5, // number of nodes
+                            1, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              clusterName));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // need to verify that each ExternalView's version number is 2
+    Builder keyBuilder = new Builder(clusterName);
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    String parentPath = keyBuilder.externalViews().getPath();
+    List<String> childNames = accessor.getChildNames(parentPath, 0);
+    
+    List<String> paths = new ArrayList<String>();
+    for (String name : childNames)
+    {
+      paths.add(parentPath + "/" + name);
+    }
+    
+//    Stat[] stats = accessor.getStats(paths);
+    for (String path : paths)
+    {
+      Stat stat = accessor.getStat(path, 0);
+      Assert.assertTrue(stat.getVersion() <= 2, "ExternalView should be updated at most 2 times");
+    }
+    
+    // TODO: need stop controller and participants
+    
+    System.out.println("END testExternalViewUpdates at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java b/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java
new file mode 100644
index 0000000..4deaf36
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java
@@ -0,0 +1,52 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.ExternalCommand;
+import org.apache.helix.ScriptTestHelper;
+import org.apache.helix.TestHelper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestFailOverPerf1kp
+{
+  @Test
+  public void testFailOverPerf1kp() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+    
+    System.out.println("START " + testName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    ExternalCommand cmd = ScriptTestHelper.runCommandLineTest("helix_random_kill_local_startzk.sh");
+    String output = cmd.getStringOutput("UTF8");
+    int i = getStateTransitionLatency(0, output);
+    int j = output.indexOf("ms", i);
+    long latency = Long.parseLong(output.substring(i, j));
+    System.out.println("startup latency: " + latency);
+    
+    i = getStateTransitionLatency(i, output);
+    j = output.indexOf("ms", i);
+    latency = Long.parseLong(output.substring(i, j));
+    System.out.println("failover latency: " + latency);
+    Assert.assertTrue(latency < 800, "failover latency for 1k partition test should < 800ms");
+    
+    System.out.println("END " + testName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+  
+  int getStateTransitionLatency(int start, String output)
+  {
+    final String pattern = "state transition latency: ";
+    int i = output.indexOf(pattern, start) + pattern.length();
+//    String latencyStr = output.substring(i, j);
+//    System.out.println(latencyStr);
+    return i;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java b/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java
new file mode 100644
index 0000000..b1b7aac
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.helix.ClusterView;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.file.StaticFileHelixManager;
+import org.apache.helix.tools.ClusterViewSerializer;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+// TODO remove this test
+public class TestFileBasedHelixManager
+{
+  @Test ()
+  public void testFileBasedHelixManager() throws Exception
+  {
+    List<StaticFileHelixManager.DBParam> dbParams = new ArrayList<StaticFileHelixManager.DBParam>();
+    dbParams.add(new StaticFileHelixManager.DBParam("BizFollow", 1));
+    dbParams.add(new StaticFileHelixManager.DBParam("BizProfile", 1));
+    dbParams.add(new StaticFileHelixManager.DBParam("EspressoDB", 10));
+    dbParams.add(new StaticFileHelixManager.DBParam("MailboxDB", 128));
+    dbParams.add(new StaticFileHelixManager.DBParam("MyDB", 8));
+    dbParams.add(new StaticFileHelixManager.DBParam("schemata", 1));
+    String[] nodesInfo =
+    { "localhost:8900", "localhost:8901", "localhost:8902", "localhost:8903",
+        "localhost:8904" };
+    
+    String file = "/tmp/clusterView.json";
+    int replica = 0;
+    // ClusterViewSerializer serializer = new ClusterViewSerializer(file);
+    ClusterView view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, replica);
+    view.setExternalView(new LinkedList<ZNRecord>());
+    ClusterViewSerializer.serialize(view, new File(file));
+    ClusterView restoredView = ClusterViewSerializer.deserialize(new File(file));
+    
+    verifyClusterViews(view, restoredView);
+  }
+  
+  public void verifyClusterViews(ClusterView view1, ClusterView view2)
+  {
+    AssertJUnit.assertEquals(view1.getPropertyLists().size(), view2.getPropertyLists().size());
+    AssertJUnit.assertEquals(view1.getExternalView().size(), view2.getExternalView().size());
+    AssertJUnit.assertEquals(view1.getMemberInstanceMap().size(), view2.getMemberInstanceMap().size());
+    AssertJUnit.assertEquals(view1.getInstances().size(), view2.getInstances().size());
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
new file mode 100644
index 0000000..f07e29d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
@@ -0,0 +1,194 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestGroupMessage extends ZkIntegrationTestBase
+{
+  class TestZkChildListener implements IZkChildListener
+  {
+    int _maxNbOfChilds = 0;
+    
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+    {
+      System.out.println(parentPath + " has " + currentChilds.size() + " messages");
+      if (currentChilds.size() > _maxNbOfChilds)
+      {
+        _maxNbOfChilds = currentChilds.size();
+      }
+    }
+    
+  }
+  
+  @Test
+  public void testBasic() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            32, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    
+    // enable group message
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setGroupMessageMode(true);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    // registry a message listener so we know how many message generated
+    TestZkChildListener listener = new TestZkChildListener();
+    _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+    
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+  
+  // a non-group-message run followed by a group-message-enabled run
+  @Test
+  public void testChangeGroupMessageMode() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            32, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // stop all participants
+    Thread.sleep(1000);
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+    
+    // enable group message
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setGroupMessageMode(true);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    // registry a message listener so we know how many message generated
+    TestZkChildListener listener = new TestZkChildListener();
+    _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+    // restart all participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+    
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}