You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[32/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
new file mode 100644
index 0000000..5f93f8f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -0,0 +1,227 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestAddClusterV2 extends ZkIntegrationTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestAddClusterV2.class);
+
+  protected static final int CLUSTER_NR = 10;
+  protected static final int NODE_NR = 5;
+  protected static final int START_PORT = 12918;
+  protected static final String STATE_MODEL = "MasterSlave";
+  protected ClusterSetup _setupTool = null;
+  protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  protected static final String TEST_DB = "TestDB";
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CONTROLLER_CLUSTER;
+    if (_gZkClient.exists(namespace))
+    {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    for (int i = 0; i < CLUSTER_NR; i++)
+    {
+      namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
+      if (_gZkClient.exists(namespace))
+      {
+        _gZkClient.deleteRecursive(namespace);
+      }
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+
+    // setup CONTROLLER_CLUSTER
+    _setupTool.addCluster(CONTROLLER_CLUSTER, true);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String controllerName = CONTROLLER_PREFIX + ":" + i;
+      _setupTool.addInstanceToCluster(CONTROLLER_CLUSTER, controllerName);
+    }
+
+
+    // setup cluster of clusters
+    for (int i = 0; i < CLUSTER_NR; i++)
+    {
+      String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
+      _setupTool.addCluster(clusterName, true);
+      _setupTool.activateCluster(clusterName,  CONTROLLER_CLUSTER, true);
+    }
+
+    final String firstCluster = CLUSTER_PREFIX + "_" + CLASS_NAME + "_0";
+    setupStorageCluster(_setupTool, firstCluster, TEST_DB, 20, PARTICIPANT_PREFIX,
+                        START_PORT, "MasterSlave", 3, true);
+
+    // start dummy participants for the first cluster
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      if (_startCMResultMap.get(instanceName) != null)
+      {
+        LOG.error("fail to start participant:" + instanceName
+                     + "(participant with the same name already running");
+      }
+      else
+      {
+        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster,
+                                                            instanceName);
+        _startCMResultMap.put(instanceName, result);
+      }
+    }
+
+    // start distributed cluster controllers
+    for (int i = 0; i < 5; i++)
+    {
+      String controllerName = CONTROLLER_PREFIX + "_" + i;
+      if (_startCMResultMap.get(controllerName) != null)
+      {
+        LOG.error("fail to start controller:" + controllerName
+                     + "(controller with the same name already running");
+      }
+      else
+      {
+        StartCMResult result = TestHelper.startController(CONTROLLER_CLUSTER,
+                                                                 controllerName,
+                                                                 ZK_ADDR,
+                                                                 HelixControllerMain.DISTRIBUTED);
+        _startCMResultMap.put(controllerName, result);
+      }
+    }
+
+    verifyClusters();
+  }
+
+  @Test
+  public void Test()
+  {
+
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception
+  {
+    System.out.println("AFTERCLASS " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    /**
+     * shutdown order:
+     *   1) pause the leader (optional)
+     *   2) disconnect all controllers
+     *   3) disconnect leader/disconnect participant
+     */
+    String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
+    // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
+
+    StartCMResult result;
+
+    Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
+
+    while (it.hasNext())
+    {
+      String instanceName = it.next().getKey();
+      if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX))
+      {
+        result = _startCMResultMap.get(instanceName);
+        result._manager.disconnect();
+        result._thread.interrupt();
+        it.remove();
+      }
+      verifyClusters();
+    }
+
+    result = _startCMResultMap.remove(leader);
+    result._manager.disconnect();
+    result._thread.interrupt();
+
+    it = _startCMResultMap.entrySet().iterator();
+    while (it.hasNext())
+    {
+      String instanceName = it.next().getKey();
+      result = _startCMResultMap.get(instanceName);
+      result._manager.disconnect();
+      result._thread.interrupt();
+      it.remove();
+    }
+
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+
+
+  /**
+   * verify the external view (against the best possible state)
+   *   in the controller cluster and the first cluster
+   */
+  protected void verifyClusters()
+  {
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CONTROLLER_CLUSTER));
+    Assert.assertTrue(result);
+    
+    result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_PREFIX + "_" + CLASS_NAME + "_0"));
+    Assert.assertTrue(result);
+  }
+
+  protected void pauseController(DataAccessor clusterDataAccessor)
+  {
+    clusterDataAccessor.setProperty(PropertyType.PAUSE, new PauseSignal("pause"));
+  }
+
+  protected void setupStorageCluster(ClusterSetup setupTool, String clusterName,
+       String dbName, int partitionNr, String prefix, int startPort, String stateModel, int replica, boolean rebalance)
+  {
+    setupTool.addResourceToCluster(clusterName, dbName, partitionNr, stateModel);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = prefix + ":" + (startPort + i);
+      setupTool.addInstanceToCluster(clusterName, instanceName);
+    }
+    if(rebalance)
+    {
+      setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
new file mode 100644
index 0000000..524cc1d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -0,0 +1,238 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase
+{
+  private static Logger LOG       =
+                                      Logger.getLogger(TestAddNodeAfterControllerStart.class);
+  final String          className = getShortClassName();
+
+  class ZkClusterManagerWithGetHandlers extends ZKHelixManager
+  {
+    public ZkClusterManagerWithGetHandlers(String clusterName,
+                                           String instanceName,
+                                           InstanceType instanceType,
+                                           String zkConnectString) throws Exception
+    {
+      super(clusterName, instanceName, instanceType, zkConnectString);
+    }
+
+    @Override
+    public List<CallbackHandler> getHandlers()
+    {
+      return super.getHandlers();
+    }
+
+  }
+
+  @Test
+  public void testStandalone() throws Exception
+  {
+    String clusterName = className + "_standalone";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    final int nodeNr = 5;
+    
+    TestHelper.setupCluster(clusterName,
+                            ZK_ADDR,
+                            12918,
+                            "localhost",
+                            "TestDB",
+                            1,
+                            20,
+                            nodeNr - 1,
+                            3,
+                            "MasterSlave",
+                            true);
+
+    MockParticipant[] participants = new MockParticipant[nodeNr];
+    for (int i = 0; i < nodeNr - 1; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      new Thread(participants[i]).start();
+    }
+
+    ZkClusterManagerWithGetHandlers controller =
+        new ZkClusterManagerWithGetHandlers(clusterName,
+                                            "controller_0",
+                                            InstanceType.CONTROLLER,
+                                            ZK_ADDR);
+    controller.connect();
+    boolean result;
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName));
+    Assert.assertTrue(result);
+    String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
+    result = checkHandlers(controller.getHandlers(), msgPath);
+    Assert.assertTrue(result);
+
+    _gSetupTool.addInstanceToCluster(clusterName, "localhost:12922");
+    _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
+
+    participants[nodeNr - 1] =
+        new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
+    new Thread(participants[nodeNr - 1]).start();
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName));
+    Assert.assertTrue(result);
+    msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12922");
+    result = checkHandlers(controller.getHandlers(), msgPath);
+    Assert.assertTrue(result);
+
+    // clean up
+    controller.disconnect();
+    for (int i = 0; i < nodeNr; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDistributed() throws Exception
+  {
+    String clusterName = className + "_distributed";
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // setup grand cluster
+    TestHelper.setupCluster("GRAND_" + clusterName,
+                            ZK_ADDR,
+                            0,
+                            "controller",
+                            null,
+                            0,
+                            0,
+                            1,
+                            0,
+                            null,
+                            true);
+   
+    TestHelper.startController("GRAND_" + clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.DISTRIBUTED);
+    
+    // setup cluster
+    _gSetupTool.addCluster(clusterName, true);
+    _gSetupTool.activateCluster(clusterName, "GRAND_" + clusterName, true);    // addCluster2
+
+    boolean result;
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          "GRAND_" + clusterName));
+    Assert.assertTrue(result);
+    
+    // add node/resource, and do rebalance
+    final int nodeNr = 2;
+    for (int i = 0; i < nodeNr - 1; i++)
+    {
+      int port = 12918 + i;
+      _gSetupTool.addInstanceToCluster(clusterName, "localhost:" + port);
+    }
+
+    _gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
+    _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
+    
+    MockParticipant[] participants = new MockParticipant[nodeNr];
+    for (int i = 0; i < nodeNr - 1; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      participants[i].syncStart();
+//      new Thread(participants[i]).start();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                          clusterName));
+    Assert.assertTrue(result);
+    
+    // check if controller_0 has message listener for localhost_12918
+    String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
+    int numberOfListeners = TestHelper.numberOfListeners(ZK_ADDR, msgPath);
+    // System.out.println("numberOfListeners(" + msgPath + "): " + numberOfListeners);
+    Assert.assertEquals(numberOfListeners, 2);  // 1 of participant, and 1 of controller
+
+    _gSetupTool.addInstanceToCluster(clusterName, "localhost:12919");
+    _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
+
+    participants[nodeNr - 1] =
+        new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
+    participants[nodeNr - 1].syncStart();
+//    new Thread(participants[nodeNr - 1]).start();
+    
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                        clusterName));
+    Assert.assertTrue(result);
+    // check if controller_0 has message listener for localhost_12919
+    msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12919");
+    numberOfListeners = TestHelper.numberOfListeners(ZK_ADDR, msgPath);
+    // System.out.println("numberOfListeners(" + msgPath + "): " + numberOfListeners);
+    Assert.assertEquals(numberOfListeners, 2);  // 1 of participant, and 1 of controller
+
+    // clean up
+    for (int i = 0; i < nodeNr; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  boolean checkHandlers(List<CallbackHandler> handlers, String path)
+  {
+//    System.out.println(handlers.size() + " handlers: ");
+    for (CallbackHandler handler : handlers)
+    {
+//      System.out.println(handler.getPath());
+      if (handler.getPath().equals(path))
+      {
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  // TODO: need to add a test case for ParticipantCodeRunner
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
new file mode 100644
index 0000000..a9b46fc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -0,0 +1,126 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.MockMSModelFactory;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
+{
+  @Test
+  public void testAddStateModelFactoryAfterConnect() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // add a new idealState without registering message handling factory
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addResourceToCluster(clusterName, "TestDB1", 16, "MasterSlave");
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB1"));
+    idealState.setStateModelFactoryName("TestDB1_Factory");
+    accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+    setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
+
+    // external view for TestDB1 should be empty
+    ExternalView extView = null;
+    long start = System.currentTimeMillis();
+    while (extView == null)
+    {
+      Thread.sleep(50);
+      extView = accessor.getProperty(keyBuilder.externalView("TestDB1"));
+
+      long now = System.currentTimeMillis();
+      if (now - start > 5000)
+      {
+        Assert.fail("Timeout waiting for an empty external view of TestDB1");
+      }
+    }
+    Assert.assertEquals(extView.getRecord().getMapFields().size(),
+                        0,
+                        "External view for TestDB1 should be empty since TestDB1 is added without a state model factory");
+
+    // register "TestDB1_Factory" state model factory
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].getManager()
+                     .getStateMachineEngine()
+                     .registerStateModelFactory("MasterSlave",
+                                                new MockMSModelFactory(),
+                                                "TestDB1_Factory");
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
new file mode 100644
index 0000000..ccedeac
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
@@ -0,0 +1,87 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase
+{
+  @Test
+  public void testAutoIsWithEmptyMap() throws Exception
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "LeaderStandby", false); // do not rebalance
+
+    // calculate and set custom ideal state
+    String idealPath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
+    ZNRecord curIdealState = _gZkClient.readData(idealPath);
+
+    List<String> instanceNames = new ArrayList<String>(5);
+    for (int i = 0; i < 5; i++)
+    {
+      int port = 12918 + i;
+      instanceNames.add("localhost_" + port);
+    }
+    ZNRecord idealState = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames, 10,
+        2, "TestDB0", "LEADER", "STANDBY");
+    // System.out.println(idealState);
+    // curIdealState.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.toString(),
+    // "CUSTOMIZED");
+    curIdealState.setSimpleField(IdealState.IdealStateProperty.REPLICAS.toString(), "3");
+
+    curIdealState.setListFields(idealState.getListFields());
+    _gZkClient.writeData(idealPath, curIdealState);
+
+    // start controller
+    TestHelper
+        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[5];
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(
+        ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // clean up
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
new file mode 100644
index 0000000..520daa3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -0,0 +1,213 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+  
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at "
+        + new Date(System.currentTimeMillis()));
+
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    String namespace = "/" + CLUSTER_NAME;
+    if (_zkClient.exists(namespace))
+    {
+      _zkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL, IdealStateModeProperty.AUTO_REBALANCE+"");
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+    
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      if (_startCMResultMap.get(instanceName) != null)
+      {
+        LOG.error("fail to start particpant:" + instanceName
+            + "(participant with same name already exists)");
+      }
+      else
+      {
+        StartCMResult result =
+            TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+        _startCMResultMap.put(instanceName, result);
+      }
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    StartCMResult startResult =
+        TestHelper.startController(CLUSTER_NAME,
+                                   controllerName,
+                                   ZK_ADDR,
+                                   HelixControllerMain.STANDALONE);
+    _startCMResultMap.put(controllerName, startResult);
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+
+    Assert.assertTrue(result);
+  }
+  
+  @Test()
+  public void testAutoRebalance() throws Exception
+  {
+    
+    // kill 1 node
+    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    _startCMResultMap.get(instanceName)._manager.disconnect();
+    Thread.currentThread().sleep(1000);
+    _startCMResultMap.get(instanceName)._thread.interrupt();
+    
+    //verifyBalanceExternalView();
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+    Assert.assertTrue(result);
+    
+    // add 2 nodes
+    for (int i = 0; i < 2; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (1000 + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      
+      StartCMResult resultx =
+          TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+      _startCMResultMap.put(storageNodeName, resultx);
+    }
+    Thread.sleep(1000);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+    Assert.assertTrue(result);
+  }
+  
+  static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String masterState, int replica, int instances)
+  {
+    Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
+    for(String partitionName : externalView.getMapFields().keySet())
+    {
+      Map<String, String> assignmentMap = externalView.getMapField(partitionName);
+      //Assert.assertTrue(assignmentMap.size() >= replica);
+      for(String instance : assignmentMap.keySet())
+      {
+        if(assignmentMap.get(instance).equals(masterState))
+        {
+          if(!masterPartitionsCountMap.containsKey(instance))
+          {
+            masterPartitionsCountMap.put(instance, 0);
+          }
+          masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1);
+        }
+      }
+    }
+    
+    int perInstancePartition = partitionCount / instances;
+    
+    int totalCount = 0;
+    for(String instanceName : masterPartitionsCountMap.keySet())
+    {
+      int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+      totalCount += instancePartitionCount;
+      if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition +1 ))
+      {
+        return false;
+      }
+      if(instancePartitionCount == perInstancePartition +1)
+      {
+        if(partitionCount % instances == 0)
+        {
+          return false;
+        }
+      }
+    }
+    if(partitionCount != totalCount)
+    {
+      return false;
+    }
+    return true;
+    
+  }
+  
+  public static class ExternalViewBalancedVerifier implements ZkVerifier
+  {
+    ZkClient _client;
+    String _clusterName;
+    String _resourceName;
+    
+    public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
+    {
+      _client = client;
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+    }
+    @Override
+    public boolean verify()
+    {
+      HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
+      Builder keyBuilder = accessor.keyBuilder();
+      int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
+      ClusterDataCache cache = new ClusterDataCache();
+      cache.refresh(accessor);
+      String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
+      int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+      return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(), numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size());
+    }
+
+    @Override
+    public ZkClient getZkClient()
+    {
+      return _client;
+    }
+
+    @Override
+    public String getClusterName()
+    {
+      return _clusterName;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
new file mode 100644
index 0000000..f817a39
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
@@ -0,0 +1,61 @@
+package org.apache.helix.integration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestBasicSpectator extends ZkStandAloneCMTestBase implements ExternalViewChangeListener
+{
+  Map<String, Integer> _externalViewChanges = new HashMap<String, Integer>();
+  
+  @Test
+  public void TestSpectator() throws Exception
+  {
+    HelixManager relayHelixManager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+        null,
+        InstanceType.SPECTATOR,
+        ZK_ADDR);
+
+    relayHelixManager.connect();
+    relayHelixManager.addExternalViewChangeListener(this);
+    
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+    
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+    
+    Assert.assertTrue(_externalViewChanges.containsKey("NextDB"));
+    Assert.assertTrue(_externalViewChanges.containsKey(TEST_DB));
+    
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext)
+  {
+    for(ExternalView view : externalViewList)
+    {
+      if(!_externalViewChanges.containsKey(view.getResourceName()))
+      {
+        _externalViewChanges.put(view.getResourceName(), 1);
+      }
+      else
+      {
+        _externalViewChanges.put(view.getResourceName(), _externalViewChanges.get(view.getResourceName())+ 1);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
new file mode 100644
index 0000000..605ce63
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -0,0 +1,87 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestBucketizedResource extends ZkIntegrationTestBase
+{
+  @Test()
+  public void testBucketizedResource() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    
+    MockParticipant[] participants = new MockParticipant[5];
+//    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    // String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setBucketSize(1);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              clusterName));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // clean up
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
new file mode 100644
index 0000000..d523867
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.testng.annotations.Test;
+
+public class TestCMWithFailParticipant extends ZkIntegrationTestBase
+{
+  // ZkClient _zkClient;
+  //
+  // @BeforeClass ()
+  // public void beforeClass() throws Exception
+  // {
+  // _zkClient = new ZkClient(ZK_ADDR);
+  // _zkClient.setZkSerializer(new ZNRecordSerializer());
+  // }
+  //
+  //
+  // @AfterClass
+  // public void afterClass()
+  // {
+  // _zkClient.close();
+  // }
+
+  @Test()
+  public void testCMWithFailParticipant() throws Exception
+  {
+    int numResources = 1;
+    int numPartitionsPerResource = 10;
+    int numInstance = 5;
+    int replica = 3;
+
+    String uniqClusterName = "TestFail_" + "rg" + numResources + "_p" + numPartitionsPerResource
+        + "_n" + numInstance + "_r" + replica;
+    System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestDriver.setupCluster(uniqClusterName, ZK_ADDR, numResources, numPartitionsPerResource,
+        numInstance, replica);
+
+    for (int i = 0; i < numInstance; i++)
+    {
+      TestDriver.startDummyParticipant(uniqClusterName, i);
+    }
+    TestDriver.startController(uniqClusterName);
+
+    TestDriver.stopDummyParticipant(uniqClusterName, 2000, 0);
+    TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000);
+    TestDriver.stopCluster(uniqClusterName);
+
+    System.out.println("END " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
new file mode 100644
index 0000000..5321183
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestCarryOverBadCurState extends ZkIntegrationTestBase
+{
+  @Test
+  public void testCarryOverBadCurState() throws Exception
+  {
+    System.out.println("START testCarryOverBadCurState at "
+        + new Date(System.currentTimeMillis()));
+
+    String clusterName = getShortClassName();
+    MockParticipant[] participants = new MockParticipant[5];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // add a bad current state
+    ZNRecord badCurState = new ZNRecord("TestDB0");
+    String path = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, clusterName, "localhost_12918", "session_0", "TestDB0");
+    _gZkClient.createPersistent(path, true);
+    _gZkClient.writeData(path, badCurState);
+    
+    
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              clusterName));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    System.out.println("END testCarryOverBadCurState at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
new file mode 100644
index 0000000..c6613f4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.HelixUtil;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestClusterStartsup extends ZkStandAloneCMTestBase
+{
+  void setupCluster() throws HelixException
+  {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_zkClient.exists(namespace))
+    {
+      _zkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String storageNodeName = "localhost:" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+  }
+
+
+  @Override
+  @BeforeClass()
+  public void beforeClass() throws Exception
+  {
+  	_zkClient = new ZkClient(ZK_ADDR);
+  }
+
+  @Override
+	@AfterClass()
+  public void afterClass()
+  {
+  	_zkClient.close();
+  }
+
+  @Test()
+  public void testParticipantStartUp() throws Exception
+  {
+    setupCluster();
+    String controllerMsgPath = HelixUtil.getControllerPropertyPath(CLUSTER_NAME, PropertyType.MESSAGES_CONTROLLER);
+    _zkClient.deleteRecursive(controllerMsgPath);
+    HelixManager manager = null;;
+
+    try
+    {
+      manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+                                                          "localhost_" + (START_PORT + 1),
+                                                          InstanceType.PARTICIPANT,
+                                                          ZK_ADDR);
+      manager.connect();
+      Assert.fail("Should fail on connect() since cluster structure is not set up");
+    }
+    catch(HelixException e)
+    {
+      // OK
+    }
+
+    if(manager != null)
+    {
+      AssertJUnit.assertFalse(manager.isConnected());
+    }
+
+    try
+    {
+      manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+                                                          "localhost_" + (START_PORT + 3),
+                                                          InstanceType.PARTICIPANT,
+                                                          ZK_ADDR);
+      manager.connect();
+      Assert.fail("Should fail on connect() since cluster structure is not set up");
+    }
+    catch(HelixException e)
+    {
+      // OK
+    }
+
+    if(manager != null)
+    {
+      AssertJUnit.assertFalse(manager.isConnected());
+    }
+
+    setupCluster();
+    String stateModelPath = HelixUtil.getStateModelDefinitionPath(CLUSTER_NAME);
+    _zkClient.deleteRecursive(stateModelPath);
+
+    try
+    {
+      manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+                                                          "localhost_" + (START_PORT + 1),
+                                                          InstanceType.PARTICIPANT,
+                                                          ZK_ADDR);
+      manager.connect();
+      Assert.fail("Should fail on connect() since cluster structure is not set up");
+    }
+    catch(HelixException e)
+    {
+      // OK
+    }
+    if(manager != null)
+    {
+      AssertJUnit.assertFalse(manager.isConnected());
+    }
+
+    setupCluster();
+    String instanceStatusUpdatePath = HelixUtil.getInstancePropertyPath(CLUSTER_NAME, "localhost_" + (START_PORT + 1), PropertyType.STATUSUPDATES);
+    _zkClient.deleteRecursive(instanceStatusUpdatePath);
+
+    try
+    {
+      manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+                                                          "localhost_" + (START_PORT + 1),
+                                                          InstanceType.PARTICIPANT,
+                                                          ZK_ADDR);
+      manager.connect();
+      Assert.fail("Should fail on connect() since cluster structure is not set up");
+    }
+    catch(HelixException e)
+    {
+      // OK
+    }
+    if(manager != null)
+    {
+      AssertJUnit.assertFalse(manager.isConnected());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
new file mode 100644
index 0000000..90f72a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestCustomIdealState extends ZkIntegrationTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestCustomIdealState.class);
+  ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterClass
+  public void afterClass()
+  {
+    _zkClient.close();
+  }
+
+  @Test
+  public void testBasic() throws Exception
+  {
+
+    int numResources = 2;
+    int numPartitionsPerResource = 100;
+    int numInstance = 5;
+    int replica = 3;
+
+    String uniqClusterName = "TestCustomIS_" + "rg" + numResources + "_p" + numPartitionsPerResource
+        + "_n" + numInstance + "_r" + replica + "_basic";
+    System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, numResources,
+        numPartitionsPerResource, numInstance, replica);
+
+    for (int i = 0; i < numInstance; i++)
+    {
+      TestDriver.startDummyParticipant(uniqClusterName, i);
+    }
+    TestDriver.startController(uniqClusterName);
+
+    TestDriver.setIdealState(uniqClusterName, 2000, 50);
+    TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000);
+
+    TestDriver.stopCluster(uniqClusterName);
+
+    System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testNonAliveInstances() throws Exception
+  {
+    int numResources = 2;
+    int numPartitionsPerResource = 50;
+    int numInstance = 5;
+    int replica = 3;
+
+    String uniqClusterName = "TestCustomIS_" + "rg" + numResources + "_p" + numPartitionsPerResource
+        + "_n" + numInstance + "_r" + replica + "_nonalive";
+    System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, numResources,
+        numPartitionsPerResource, numInstance, replica);
+
+    for (int i = 0; i < numInstance / 2; i++)
+    {
+      TestDriver.startDummyParticipant(uniqClusterName, i);
+    }
+
+    TestDriver.startController(uniqClusterName);
+    TestDriver.setIdealState(uniqClusterName, 0, 100);
+
+    // wait some time for customized ideal state being populated
+    Thread.sleep(1000);
+
+    // start the rest of participants after ideal state is set
+    for (int i = numInstance / 2; i < numInstance; i++)
+    {
+      TestDriver.startDummyParticipant(uniqClusterName, i);
+    }
+
+    TestDriver.verifyCluster(uniqClusterName, 4000, 50 * 1000);
+
+    TestDriver.stopCluster(uniqClusterName);
+
+    System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test()
+  public void testDrop() throws Exception
+  {
+    int numResources = 2;
+    int numPartitionsPerResource = 50;
+    int numInstance = 5;
+    int replica = 3;
+
+    String uniqClusterName = "TestCustomIS_" + "rg" + numResources + "_p" + numPartitionsPerResource
+        + "_n" + numInstance + "_r" + replica + "_drop";
+
+    System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+    TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, numResources,
+        numPartitionsPerResource, numInstance, replica);
+
+    for (int i = 0; i < numInstance; i++)
+    {
+      TestDriver.startDummyParticipant(uniqClusterName, i);
+    }
+    TestDriver.startController(uniqClusterName);
+    TestDriver.setIdealState(uniqClusterName, 2000, 50);
+    TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000);
+
+    // drop resource group
+    ClusterSetup setup = new ClusterSetup(ZK_ADDR);
+    setup.dropResourceFromCluster(uniqClusterName, "TestDB0");
+
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", uniqClusterName, "TestDB0",
+        TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
+            "localhost_12921", "localhost_12922"), ZK_ADDR);
+
+    TestDriver.stopCluster(uniqClusterName);
+    System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  // TODO add a test case that verify (in case of node failure) best possible
+  // state is a subset of ideal state
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
new file mode 100644
index 0000000..3a29560
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
@@ -0,0 +1,403 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+
+public class TestDisable extends ZkIntegrationTestBase
+{
+
+  @Test
+  public void testDisableNodeCustomIS() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+    String disableNode = "localhost_12918";
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // set ideal state to customized mode
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // disable localhost_12918
+    String command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
+        " " + disableNode + " false";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // make sure localhost_12918 is in OFFLINE state
+    Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+    Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+    expectInstanceStateMap.put(disableNode, "OFFLINE");
+    expectStateMap.put(".*", expectInstanceStateMap);
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+    Assert.assertTrue(result, disableNode + " should be in OFFLINE");
+    
+    // re-enable localhost_12918
+    command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
+        " " + disableNode + " true";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // make sure localhost_12918 is NOT in OFFLINE state
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+    Assert.assertTrue(result, disableNode + " should NOT be in OFFLINE");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis())); 
+  }
+  
+  @Test
+  public void testDisableNodeAutoIS() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+    String disableNode = "localhost_12919";
+
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // disable localhost_12919
+    String command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
+        " " + disableNode + " false";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // make sure localhost_12919 is in OFFLINE state
+    Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+    Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+    expectInstanceStateMap.put(disableNode, "OFFLINE");
+    expectStateMap.put(".*", expectInstanceStateMap);
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+    Assert.assertTrue(result, disableNode + " should be in OFFLINE");
+    
+    // re-enable localhost_12919
+    command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
+        " " + disableNode + " true";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // make sure localhost_12919 is NOT in OFFLINE state
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+    Assert.assertTrue(result, disableNode + " should NOT be in OFFLINE");
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis())); 
+  }
+  
+  @Test
+  public void testDisablePartitionCustomIS() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // set ideal state to customized mode
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // disable [TestDB0_0, TestDB0_5] on localhost_12919
+    String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + clusterName +
+        " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // make sure localhost_12918 is in OFFLINE state for [TestDB0_0, TestDB0_5]
+    Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+    Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+    expectInstanceStateMap.put("localhost_12919", "OFFLINE");
+    expectStateMap.put("TestDB0_0", expectInstanceStateMap);
+    expectStateMap.put("TestDB0_5", expectInstanceStateMap);
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+    Assert.assertTrue(result, "localhost_12919" + " should be in OFFLINE for [TestDB0_0, TestDB0_5]");
+
+
+    // re-enable localhost_12919 for [TestDB0_0, TestDB0_5]
+    command = "--zkSvr " + ZK_ADDR +" --enablePartition true " + clusterName +
+        " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // make sure localhost_12919 is NOT in OFFLINE state for [TestDB0_0, TestDB0_5]
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+    Assert.assertTrue(result,  "localhost_12919" + " should NOT be in OFFLINE");
+
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis())); 
+  }
+  
+  @Test
+  public void testDisablePartitionAutoIS() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            8, // partitions per resource
+                            n, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+    
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // disable [TestDB0_0, TestDB0_5] on localhost_12919
+    String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + clusterName +
+        " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    // make sure localhost_12918 is in OFFLINE state for [TestDB0_0, TestDB0_5]
+    Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+    Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+    expectInstanceStateMap.put("localhost_12919", "OFFLINE");
+    expectStateMap.put("TestDB0_0", expectInstanceStateMap);
+    expectStateMap.put("TestDB0_5", expectInstanceStateMap);
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+    Assert.assertTrue(result, "localhost_12919" + " should be in OFFLINE for [TestDB0_0, TestDB0_5]");
+
+
+    // re-enable localhost_12919 for [TestDB0_0, TestDB0_5]
+    command = "--zkSvr " + ZK_ADDR +" --enablePartition true " + clusterName +
+        " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // make sure localhost_12919 is NOT in OFFLINE state for [TestDB0_0, TestDB0_5]
+    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+    Assert.assertTrue(result,  "localhost_12919" + " should NOT be in OFFLINE");
+
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis())); 
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
new file mode 100644
index 0000000..ff0c535
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+
+  @Test()
+  public void testDisableNode() throws Exception
+  {
+    String command = "-zkSvr " + ZK_ADDR +" -enableInstance " + CLUSTER_NAME + " "+
+        PARTICIPANT_PREFIX + "_12918" + " TestDB TestDB_0 false";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    tool.enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_12918", true);
+    
+    result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
new file mode 100644
index 0000000..0ce5ce0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
+
+  @Test()
+  public void testDisablePartition() throws Exception
+  {
+    LOG.info("START testDisablePartition() at " + new Date(System.currentTimeMillis()));
+
+    // localhost_12919 is MASTER for TestDB_0
+    String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + CLUSTER_NAME +
+        " localhost_12919 TestDB TestDB_0 TestDB_9";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    Map<String, Set<String>> map = new HashMap<String, Set<String>>();
+    map.put("TestDB_0", TestHelper.setOf("localhost_12919"));
+    map.put("TestDB_9", TestHelper.setOf("localhost_12919"));
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
+
+    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    tool.enablePartition(true, CLUSTER_NAME, "localhost_12919", "TestDB", Arrays.asList("TestDB_9"));
+
+    result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    map.clear();
+    map.put("TestDB_0", TestHelper.setOf("localhost_12919"));
+    TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
+
+    map.clear();
+    map.put("TestDB_9", TestHelper.setOf("localhost_12919"));
+    TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "MASTER");
+
+    LOG.info("STOP testDisablePartition() at " + new Date(System.currentTimeMillis()));
+
+  }
+
+}