You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/13 20:41:14 UTC

[5/5] helix git commit: Properly remove clusters after each test, and clean up duplicated codes in tests and move them into base test classes.

Properly remove clusters after each test, and clean up duplicated codes in tests and move them into base test classes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c0d5792b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c0d5792b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c0d5792b

Branch: refs/heads/master
Commit: c0d5792b745c67b6fee56ba79df02be89d1f049e
Parents: fe97055
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Jun 7 17:15:54 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Jul 13 11:20:49 2018 -0700

----------------------------------------------------------------------
 .../BestPossibleExternalViewVerifier.java       |  13 +-
 .../java/org/apache/helix/ZkUnitTestBase.java   | 442 +-----------
 .../org/apache/helix/common/ZkTestBase.java     | 691 +++++++++++++++++++
 .../stages/TestMessageThrottleStage.java        |   4 +-
 .../stages/TestRebalancePipeline.java           |   8 +-
 .../SinglePartitionLeaderStandByTest.java       |   5 +-
 .../helix/integration/TestAddClusterV2.java     |  23 +-
 .../TestAddNodeAfterControllerStart.java        |   6 +-
 .../TestAddStateModelFactoryAfterConnect.java   |   5 +-
 .../TestAlertingRebalancerFailure.java          |  39 +-
 .../helix/integration/TestBasicSpectator.java   |   4 +-
 .../integration/TestBatchMessageHandling.java   |   6 +-
 .../integration/TestBucketizedResource.java     |   4 +-
 .../integration/TestCMWithFailParticipant.java  |   4 +-
 .../integration/TestCarryOverBadCurState.java   |   6 +-
 .../integration/TestCleanupExternalView.java    |   5 +-
 .../helix/integration/TestClusterStartsup.java  |  22 +-
 .../TestCorrectnessOnConnectivityLoss.java      |  12 +-
 .../apache/helix/integration/TestDisable.java   |   4 +-
 .../integration/TestDisableExternalView.java    |   4 +-
 .../helix/integration/TestDisableResource.java  |   2 +
 .../integration/TestDistributedCMMain.java      |   4 +-
 .../TestDistributedClusterController.java       |   4 +-
 .../apache/helix/integration/TestDriver.java    |   5 +-
 .../org/apache/helix/integration/TestDrop.java  |   4 +-
 .../helix/integration/TestDropResource.java     |   8 +-
 .../integration/TestEnableCompression.java      |   4 +-
 .../TestEnablePartitionDuringDisable.java       |   4 +-
 .../integration/TestEntropyFreeNodeBounce.java  |   1 +
 .../helix/integration/TestErrorPartition.java   |   4 +-
 .../helix/integration/TestExpandCluster.java    |  22 +-
 .../integration/TestExternalViewUpdates.java    |   4 +-
 .../integration/TestHelixCustomCodeRunner.java  |   6 +-
 .../helix/integration/TestHelixInstanceTag.java |  12 +-
 .../TestHelixUsingDifferentParams.java          |   4 +-
 .../TestInvalidResourceRebalance.java           |   1 +
 .../helix/integration/TestNullReplica.java      |   5 +-
 .../TestPartitionLevelTransitionConstraint.java |   5 +-
 .../TestPartitionMovementThrottle.java          |  36 +-
 .../helix/integration/TestPauseSignal.java      |   4 +-
 .../TestRebalancerPersistAssignments.java       |  43 +-
 .../TestReelectedPipelineCorrectness.java       |   1 +
 .../helix/integration/TestRenamePartition.java  |   4 +-
 .../helix/integration/TestResetInstance.java    |   4 +-
 .../integration/TestResetPartitionState.java    |   5 +-
 .../helix/integration/TestResetResource.java    |   5 +-
 .../integration/TestResourceGroupEndtoEnd.java  |   6 +-
 .../TestResourceWithSamePartitionKey.java       |   1 +
 .../helix/integration/TestSchemataSM.java       |   4 +-
 .../TestSessionExpiryInTransition.java          |   4 +-
 .../TestStandAloneCMSessionExpiry.java          |   4 +-
 ...estStartMultipleControllersWithSameName.java |   5 +-
 .../TestStateTransitionCancellation.java        |  23 +-
 .../TestStateTransitionThrottle.java            |   8 +-
 .../helix/integration/TestStatusUpdate.java     |   4 +-
 .../helix/integration/TestSwapInstance.java     |  13 +-
 .../TestSyncSessionToController.java            |   4 +-
 .../TestWeightBasedRebalanceUtil.java           |  16 +-
 .../helix/integration/TestZkConnectionLost.java |  43 +-
 .../integration/common/IntegrationTest.java     |  38 -
 .../common/ZkIntegrationTestBase.java           | 322 ---------
 .../common/ZkStandAloneCMTestBase.java          |  36 +-
 .../TestClusterDataCacheSelectiveUpdate.java    |  12 +-
 .../controller/TestClusterMaintenanceMode.java  |  17 +-
 .../TestControllerLeadershipChange.java         |   4 +-
 .../controller/TestControllerLiveLock.java      |   1 +
 .../TestSkipBestPossibleCalculation.java        |   4 +-
 .../TestDistributedControllerManager.java       |   4 +-
 .../manager/TestHelixDataAccessor.java          |   7 +-
 .../manager/TestParticipantManager.java         |   4 +-
 .../integration/messaging/TestBatchMessage.java |  15 +-
 .../messaging/TestBatchMessageWrapper.java      |   1 +
 .../messaging/TestMessageThrottle.java          |   4 +-
 .../messaging/TestMessageThrottle2.java         |   4 +-
 .../messaging/TestP2PMessageSemiAuto.java       |  19 +-
 .../paticipant/TestInstanceAutoJoin.java        |   4 +-
 .../paticipant/TestNonOfflineInitState.java     |  21 +-
 .../paticipant/TestRestartParticipant.java      |   5 +-
 .../paticipant/TestStateTransitionTimeout.java  |  18 +-
 .../TestStateTransitionTimeoutWithResource.java |  30 +-
 .../TestCrushAutoRebalance.java                 |  81 +--
 .../TestCrushAutoRebalanceNonRack.java          |  73 +-
 ...rushAutoRebalanceTopoplogyAwareDisabled.java |  20 +-
 .../TestDelayedAutoRebalance.java               |  48 +-
 ...elayedAutoRebalanceWithDisabledInstance.java |  54 +-
 .../TestDelayedAutoRebalanceWithRackaware.java  |  11 +-
 .../PartitionMigration/TestExpandCluster.java   |   6 +-
 .../TestFullAutoMigration.java                  |   4 +-
 .../TestPartitionMigrationBase.java             |  43 +-
 .../rebalancer/TestAutoIsWithEmptyMap.java      |   5 +-
 .../rebalancer/TestAutoRebalance.java           |  30 +-
 .../TestAutoRebalancePartitionLimit.java        |  16 +-
 .../TestAutoRebalanceWithDisabledInstance.java  |  18 +-
 ...MaintenanceModeWhenReachingMaxPartition.java |   9 +-
 ...ceModeWhenReachingOfflineInstancesLimit.java |  10 +-
 .../rebalancer/TestCustomIdealState.java        |   4 +-
 .../TestCustomizedIdealStateRebalancer.java     |   8 +-
 .../rebalancer/TestFullAutoNodeTagging.java     |   9 +
 .../rebalancer/TestMixedModeAutoRebalance.java  |   8 +-
 .../rebalancer/TestSemiAutoRebalance.java       |  24 +-
 .../rebalancer/TestZeroReplicaAvoidance.java    |  19 +-
 .../spectator/TestRoutingTableProvider.java     |   4 +-
 ...stRoutingTableProviderFromCurrentStates.java |  47 +-
 .../TestRoutingTableProviderFromTargetEV.java   |  36 +-
 ...TestRoutingTableProviderPeriodicRefresh.java |   4 +-
 .../spectator/TestRoutingTableSnapshot.java     |  26 +-
 .../helix/integration/task/TaskTestBase.java    |  11 +-
 .../integration/task/TestBatchAddJobs.java      |  13 +-
 .../task/TestIndependentTaskRebalancer.java     |  10 +-
 .../helix/integration/task/TestJobFailure.java  |  10 +-
 .../task/TestJobFailureDependence.java          |   8 +-
 .../task/TestJobFailureHighThreshold.java       |   9 +-
 .../task/TestJobFailureTaskNotStarted.java      |  20 +-
 .../helix/integration/task/TestJobTimeout.java  |   9 +-
 .../task/TestJobTimeoutTaskNotStarted.java      |  10 +-
 .../task/TestRebalanceRunningTask.java          |  10 +-
 .../task/TestRunJobsWithMissingTarget.java      |   4 +-
 .../integration/task/TestTaskAssignment.java    |   2 +-
 .../task/TestTaskRebalancerParallel.java        |   4 -
 .../integration/task/TestTaskThreadLeak.java    |   4 +-
 .../integration/task/TestTaskThrottling.java    |   6 +-
 .../task/TestTaskWithInstanceDisabled.java      |   2 +-
 .../integration/task/TestUserContentStore.java  |   9 +-
 .../helix/manager/zk/TestHandleNewSession.java  |   5 +-
 .../manager/zk/TestZkBaseDataAccessor.java      |  91 +--
 .../zk/TestZkManagerFlappingDetection.java      |   4 +-
 .../handling/TestBatchMessageModeConfigs.java   |  14 +-
 .../handling/TestResourceThreadpoolSize.java    |  24 +-
 .../TestClusterStatusMonitorLifecycle.java      |  23 +-
 .../mbeans/TestClusterAggregateMetrics.java     |   4 +-
 .../mbeans/TestDisableResourceMbean.java        |   1 +
 .../mbeans/TestDropResourceMetricsReset.java    |   1 +
 .../mbeans/TestResetClusterMetrics.java         |   1 +
 .../participant/TestDistControllerElection.java |  16 +-
 .../TestDistControllerStateModel.java           |   8 +
 .../helix/spectator/TestRoutingDataCache.java   |  14 +-
 .../helix/task/TaskSynchronizedTestBase.java    |  66 +-
 ...signableInstanceManagerControllerSwitch.java |   2 +-
 .../helix/task/TestJobStateOnCreation.java      |   8 +-
 .../helix/task/TestSemiAutoStateTransition.java |  12 +-
 .../apache/helix/tools/TestClusterSetup.java    |  11 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |   4 +-
 142 files changed, 1561 insertions(+), 1719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 4abfd07..fa5d29f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -82,18 +82,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     _expectLiveInstances = expectLiveInstances;
     _clusterDataCache = new ClusterDataCache();
   }
-
-  public static void main (String [] args) {
-    Set<String> resources = Collections.singleton("SyncColoTestDB");
-    BestPossibleExternalViewVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder("ESPRESSO_MT1")
-            .setZkAddr("zk-ltx1-espresso.stg.linkedin.com:12913")
-            .setResources(resources)
-            .build();
-
-    verifier.verify();
-  }
-
+  
   public static class Builder {
     private String _clusterName;
     private Map<String, Map<String, String>> _errStates;

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 483f0af..5757b24 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -19,445 +19,15 @@ package org.apache.helix;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
-import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.helix.util.ZKClientPool;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
+import org.apache.helix.common.ZkTestBase;
 
 // TODO merge code with ZkIntegrationTestBase
-public class ZkUnitTestBase {
-  private static Logger LOG = LoggerFactory.getLogger(ZkUnitTestBase.class);
-  protected static ZkServer _zkServer = null;
-  protected static ZkClient _gZkClient;
-
-  public static final String ZK_ADDR = "localhost:2185";
-  protected static final String CLUSTER_PREFIX = "CLUSTER";
-  protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
-
-  @BeforeSuite(alwaysRun = true)
-  public void beforeSuite() throws Exception {
-    // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
-    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-    System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000");
-
-    _zkServer = TestHelper.startZkServer(ZK_ADDR);
-    AssertJUnit.assertTrue(_zkServer != null);
-    ZKClientPool.reset();
-
-    // System.out.println("Number of open zkClient before ZkUnitTests: "
-    // + ZkClient.getNumberOfConnections());
-
-    _gZkClient = new ZkClient(ZK_ADDR);
-    _gZkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterSuite(alwaysRun = true)
-  public void afterSuite() {
-    _gZkClient.close();
-    TestHelper.stopZkServer(_zkServer);
-    _zkServer = null;
-
-    // System.out.println("Number of open zkClient after ZkUnitTests: "
-    // + ZkClient.getNumberOfConnections());
-
-  }
-
-  protected String getShortClassName() {
-    String className = this.getClass().getName();
-    return className.substring(className.lastIndexOf('.') + 1);
-  }
-
-  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
-    String leaderPath = PropertyPathBuilder.controllerLeader(clusterName);
-    ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath);
-    if (leaderRecord == null) {
-      return null;
-    }
-
-    String leader = leaderRecord.getSimpleField(PropertyType.LEADER.toString());
-    return leader;
-  }
-
-  protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
-      Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
-    String leader = getCurrentLeader(zkClient, clusterName);
-    Assert.assertTrue(leader != null);
-    System.out.println("stop leader:" + leader + " in " + clusterName);
-    Assert.assertTrue(leader != null);
-
-    HelixManager manager = managerMap.remove(leader);
-    Assert.assertTrue(manager != null);
-    manager.disconnect();
-
-    Thread thread = threadMap.remove(leader);
-    Assert.assertTrue(thread != null);
-    thread.interrupt();
-
-    boolean isNewLeaderElected = false;
-    try {
-      // Thread.sleep(2000);
-      for (int i = 0; i < 5; i++) {
-        Thread.sleep(1000);
-        String newLeader = getCurrentLeader(zkClient, clusterName);
-        if (!newLeader.equals(leader)) {
-          isNewLeaderElected = true;
-          System.out.println("new leader elected: " + newLeader + " in " + clusterName);
-          break;
-        }
-      }
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    if (isNewLeaderElected == false) {
-      System.out.println("fail to elect a new leader elected in " + clusterName);
-    }
-    AssertJUnit.assertTrue(isNewLeaderElected);
-  }
-
-  public void verifyInstance(ZkClient zkClient, String clusterName, String instance,
-      boolean wantExists) {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-    String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
-    String instanceConfigPath = instanceConfigsPath + "/" + instance;
-    String instancePath = PropertyPathBuilder.instance(clusterName, instance);
-    AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
-    AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
-  }
-
-  public void verifyResource(ZkClient zkClient, String clusterName, String resource,
-      boolean wantExists) {
-    String resourcePath = PropertyPathBuilder.idealState(clusterName, resource);
-    AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
-  }
-
-  public void verifyEnabled(ZkClient zkClient, String clusterName, String instance,
-      boolean wantEnabled) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
-    AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
-  }
-
-  public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
-    for (String partitionName : idealState.getPartitionSet()) {
-      if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
-      } else if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName).size());
-      }
-    }
-  }
-
-  protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException,
-      InterruptedException {
-    ZooKeeper oldZookeeper = zkConnection.getZookeeper();
-    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
-
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        LOG.info("In New connection, process event:" + event);
-      }
-    };
-
-    ZooKeeper newZookeeper =
-        new ZooKeeper(zkConnection.getServers(), oldZookeeper.getSessionTimeout(), watcher,
-            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
-    LOG.info("New sessionId = " + newZookeeper.getSessionId());
-    // Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    oldZookeeper = zkConnection.getZookeeper();
-    LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
-  }
-
-  protected void simulateSessionExpiry(ZkClient zkClient) throws IOException, InterruptedException {
-    IZkStateListener listener = new IZkStateListener() {
-      @Override
-      public void handleStateChanged(KeeperState state) throws Exception {
-        LOG.info("In Old connection, state changed:" + state);
-      }
-
-      @Override
-      public void handleNewSession() throws Exception {
-        LOG.info("In Old connection, new session");
-      }
-
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
-      }
-    };
-    zkClient.subscribeStateChanges(listener);
-    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
-    ZooKeeper oldZookeeper = connection.getZookeeper();
-    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
-
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        LOG.info("In New connection, process event:" + event);
-      }
-    };
-
-    ZooKeeper newZookeeper =
-        new ZooKeeper(connection.getServers(), oldZookeeper.getSessionTimeout(), watcher,
-            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
-    LOG.info("New sessionId = " + newZookeeper.getSessionId());
-    // Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    connection = (ZkConnection) zkClient.getConnection();
-    oldZookeeper = connection.getZookeeper();
-    LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
-  }
-
-  protected void setupStateModel(String clusterName) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    StateModelDefinition masterSlave =
-        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
-    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
-
-    StateModelDefinition leaderStandby =
-        new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
-    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
-
-    StateModelDefinition onlineOffline =
-        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
-    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
-
-  }
-
-  protected List<IdealState> setupIdealState(String clusterName, int[] nodes, String[] resources,
-      int partitions, int replicas) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    List<IdealState> idealStates = new ArrayList<IdealState>();
-    List<String> instances = new ArrayList<String>();
-    for (int i : nodes) {
-      instances.add("localhost_" + i);
-    }
-
-    for (String resourceName : resources) {
-      IdealState idealState = new IdealState(resourceName);
-      for (int p = 0; p < partitions; p++) {
-        List<String> value = new ArrayList<String>();
-        for (int r = 0; r < replicas; r++) {
-          int n = nodes[(p + r) % nodes.length];
-          value.add("localhost_" + n);
-        }
-        idealState.getRecord().setListField(resourceName + "_" + p, value);
-      }
-
-      idealState.setReplicas(Integer.toString(replicas));
-      idealState.setStateModelDefRef("MasterSlave");
-      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-      idealState.setNumPartitions(partitions);
-      idealStates.add(idealState);
-
-      // System.out.println(idealState);
-      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
-    }
-    return idealStates;
-  }
-
-  protected void setupLiveInstances(String clusterName, int[] liveInstances) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    for (int i = 0; i < liveInstances.length; i++) {
-      String instance = "localhost_" + liveInstances[i];
-      LiveInstance liveInstance = new LiveInstance(instance);
-      liveInstance.setSessionId("session_" + liveInstances[i]);
-      liveInstance.setHelixVersion("0.0.0");
-      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
-    }
-  }
-
-  protected void setupInstances(String clusterName, int[] instances) {
-    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
-    for (int i = 0; i < instances.length; i++) {
-      String instance = "localhost_" + instances[i];
-      InstanceConfig instanceConfig = new InstanceConfig(instance);
-      instanceConfig.setHostName("localhost");
-      instanceConfig.setPort("" + instances[i]);
-      instanceConfig.setInstanceEnabled(true);
-      admin.addInstance(clusterName, instanceConfig);
-    }
-  }
-
-  protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
-    try {
-      pipeline.handle(event);
-      pipeline.finish();
-    } catch (Exception e) {
-      LOG.error("Exception while executing pipeline:" + pipeline
-          + ". Will not continue to next pipeline", e);
-    }
-  }
-
-  protected void runStage(ClusterEvent event, Stage stage) throws Exception {
-    StageContext context = new StageContext();
-    stage.init(context);
-    stage.preProcess();
-
-    // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in
-    // execute() function call
-    // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage()
-    // to a shared library
-    if (stage instanceof AbstractAsyncBaseStage) {
-      ((AbstractAsyncBaseStage) stage).execute(event);
-    } else {
-      stage.process(event);
-    }
-    stage.postProcess();
-  }
-
-  protected Message createMessage(MessageType type, String msgId, String fromState, String toState,
-      String resourceName, String tgtName) {
-    Message msg = new Message(type.toString(), msgId);
-    msg.setFromState(fromState);
-    msg.setToState(toState);
-    msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
-    msg.setTgtName(tgtName);
-    return msg;
-  }
-
-  /**
-   * Poll for the existence (or lack thereof) of a specific Helix property
-   * @param clazz the HelixProeprty subclass
-   * @param accessor connected HelixDataAccessor
-   * @param key the property key to look up
-   * @param shouldExist true if the property should exist, false otherwise
-   * @return the property if found, or null if it does not exist
-   */
-  protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, HelixDataAccessor accessor,
-      PropertyKey key, boolean shouldExist) throws InterruptedException {
-    final int POLL_TIMEOUT = 5000;
-    final int POLL_INTERVAL = 50;
-    T property = accessor.getProperty(key);
-    int timeWaited = 0;
-    while (((shouldExist && property == null) || (!shouldExist && property != null))
-        && timeWaited < POLL_TIMEOUT) {
-      Thread.sleep(POLL_INTERVAL);
-      timeWaited += POLL_INTERVAL;
-      property = accessor.getProperty(key);
-    }
-    return property;
-  }
-
-  /**
-   * Ensures that external view and current state are empty
-   */
-  protected static class EmptyZkVerifier implements ZkVerifier {
-    private final String _clusterName;
-    private final String _resourceName;
-    private final ZkClient _zkClient;
-
-    /**
-     * Instantiate the verifier
-     * @param clusterName the cluster to verify
-     * @param resourceName the resource to verify
-     */
-    public EmptyZkVerifier(String clusterName, String resourceName) {
-      _clusterName = clusterName;
-      _resourceName = resourceName;
-      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
-    }
-
-    @Override
-    public boolean verify() {
-      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
-      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
-
-      // verify external view empty
-      if (externalView != null) {
-        for (String partition : externalView.getPartitionSet()) {
-          Map<String, String> stateMap = externalView.getStateMap(partition);
-          if (stateMap != null && !stateMap.isEmpty()) {
-            LOG.error("External view not empty for " + partition);
-            return false;
-          }
-        }
-      }
-
-      // verify current state empty
-      List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
-      for (String participant : liveParticipants) {
-        List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
-        for (String sessionId : sessionIds) {
-          CurrentState currentState =
-              accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
-          Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
-          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
-            LOG.error("Current state not empty for " + participant);
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public ZkClient getZkClient() {
-      return _zkClient;
-    }
+public class ZkUnitTestBase extends ZkTestBase {
 
-    @Override
-    public String getClusterName() {
-      return _clusterName;
+  protected void deleteCluster(String clusterName) {
+    String namespace = "/" + clusterName;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
new file mode 100644
index 0000000..7d98119
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -0,0 +1,691 @@
+package org.apache.helix.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeSuite;
+
+public class ZkTestBase {
+  private static Logger LOG = LoggerFactory.getLogger(ZkTestBase.class);
+
+  protected static ZkServer _zkServer;
+  protected static ZkClient _gZkClient;
+  protected static ClusterSetup _gSetupTool;
+  protected static BaseDataAccessor<ZNRecord> _baseAccessor;
+
+  public static final String ZK_ADDR = "localhost:2183";
+  protected static final String CLUSTER_PREFIX = "CLUSTER";
+  protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
+  protected final String CONTROLLER_PREFIX = "controller";
+  protected final String PARTICIPANT_PREFIX = "localhost";
+
+
+  @BeforeSuite
+  public void beforeSuite() throws Exception {
+    // TODO: use logging.properties file to config java.util.logging.Logger levels
+    java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
+    topJavaLogger.setLevel(Level.WARNING);
+
+    // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
+    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000");
+
+    _zkServer = TestHelper.startZkServer(ZK_ADDR);
+    AssertJUnit.assertTrue(_zkServer != null);
+    ZKClientPool.reset();
+
+    _gZkClient = new ZkClient(ZK_ADDR);
+    _gZkClient.setZkSerializer(new ZNRecordSerializer());
+    _gSetupTool = new ClusterSetup(_gZkClient);
+    _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
+  }
+
+  @AfterSuite
+  public void afterSuite() {
+    ZKClientPool.reset();
+    _gZkClient.close();
+    TestHelper.stopZkServer(_zkServer);
+  }
+
+  @BeforeMethod
+  public void beforeTest(Method testMethod, ITestContext testContext){
+    long startTime = System.currentTimeMillis();
+    System.out.println("START " + testMethod.getName() + " at " + new Date(startTime));
+    testContext.setAttribute("StartTime", System.currentTimeMillis());
+  }
+
+  @AfterMethod
+  public void endTest(Method testMethod, ITestContext testContext) {
+    Long startTime = (Long) testContext.getAttribute("StartTime");
+    long endTime = System.currentTimeMillis();
+    System.out.println(
+        "END " + testMethod.getName() + " at " + new Date(endTime) + ", took: " + (endTime
+            - startTime) + "ms.");
+  }
+
+
+  protected String getShortClassName() {
+    return this.getClass().getSimpleName();
+  }
+
+  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader == null) {
+      return null;
+    }
+    return leader.getInstanceName();
+  }
+
+  protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
+      Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
+    String leader = getCurrentLeader(zkClient, clusterName);
+    Assert.assertTrue(leader != null);
+    System.out.println("stop leader:" + leader + " in " + clusterName);
+    Assert.assertTrue(leader != null);
+
+    HelixManager manager = managerMap.remove(leader);
+    Assert.assertTrue(manager != null);
+    manager.disconnect();
+
+    Thread thread = threadMap.remove(leader);
+    Assert.assertTrue(thread != null);
+    thread.interrupt();
+
+    boolean isNewLeaderElected = false;
+    try {
+      // Thread.sleep(2000);
+      for (int i = 0; i < 5; i++) {
+        Thread.sleep(1000);
+        String newLeader = getCurrentLeader(zkClient, clusterName);
+        if (!newLeader.equals(leader)) {
+          isNewLeaderElected = true;
+          System.out.println("new leader elected: " + newLeader + " in " + clusterName);
+          break;
+        }
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    if (isNewLeaderElected == false) {
+      System.out.println("fail to elect a new leader elected in " + clusterName);
+    }
+    AssertJUnit.assertTrue(isNewLeaderElected);
+  }
+
+  protected void enableHealthCheck(String clusterName) {
+    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
+    new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
+  }
+
+  protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistBestPossibleAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enablePersistIntermediateAssignment(ZkClient zkClient, String clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistIntermediateAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableTopologyAwareRebalance(ZkClient zkClient, String clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setTopologyAwareEnabled(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
+      boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setDelayRebalaceEnabled(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableDelayRebalanceInInstance(ZkClient zkClient, String clusterName,
+      String instanceName, boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+    instanceConfig.setDelayRebalanceEnabled(enabled);
+    configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig);
+  }
+
+  protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setRebalanceDelayTime(delay);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+    return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
+        minActiveReplica, delay, AutoRebalanceStrategy.class.getName());
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, long delay,
+      String rebalanceStrategy) {
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    if (idealState == null) {
+      _gSetupTool.addResourceToCluster(clusterName, db, numPartition, stateModel,
+          IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategy);
+    }
+
+    idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    idealState.setMinActiveReplicas(minActiveReplica);
+    if (!idealState.isDelayRebalanceEnabled()) {
+      idealState.setDelayRebalanceEnabled(true);
+    }
+    if (delay > 0) {
+      idealState.setRebalanceDelay(delay);
+    }
+    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState);
+    _gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
+    idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+
+    return idealState;
+  }
+
+  protected IdealState createIdealState(String resourceGroupName, String instanceGroupTag,
+      List<String> instanceNames, int numPartition, int replica, String rebalanceMode,
+      String stateModelDef) {
+    IdealState is = _gSetupTool
+        .createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag, numPartition,
+            replica, rebalanceMode, stateModelDef);
+
+    // setup initial partition->instance mapping.
+    int nodeIdx = 0;
+    int numNode = instanceNames.size();
+    assert (numNode >= replica);
+    for (int i = 0; i < numPartition; i++) {
+      String partitionName = resourceGroupName + "_" + i;
+      for (int j = 0; j < replica; j++) {
+        is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode),
+            OnlineOfflineSMD.States.ONLINE.toString());
+      }
+      nodeIdx++;
+    }
+
+    return is;
+  }
+
+  protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName, String dbName,
+      List<String> preferenceList, String stateModelDef, int numPartition, int replica) {
+    clusterSetup.addResourceToCluster(clusterName, dbName, numPartition, stateModelDef,
+        IdealState.RebalanceMode.SEMI_AUTO.toString());
+    clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica);
+
+    IdealState is =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, dbName);
+    for (String p : is.getPartitionSet()) {
+      is.setPreferenceList(p, preferenceList);
+    }
+    clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is);
+  }
+
+  /**
+   * Validate there should be always minimal active replica and top state replica for each partition.
+   * Also make sure there is always some partitions with only active replica count.
+   */
+  protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev,
+      int minActiveReplica, int numNodes) {
+    StateModelDefinition stateModelDef =
+        BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+    String topState = stateModelDef.getStatesPriorityList().get(0);
+    int replica = Integer.valueOf(is.getReplicas());
+
+    Map<String, Integer> stateCount =
+        stateModelDef.getStateCountMap(numNodes, replica);
+    Set<String> activeStates = stateCount.keySet();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Assert.assertNotNull(assignmentMap,
+          is.getResourceName() + "'s best possible assignment is null for partition " + partition);
+      Assert.assertTrue(!assignmentMap.isEmpty(),
+          is.getResourceName() + "'s partition " + partition + " has no best possible map in IS.");
+
+      boolean hasTopState = false;
+      int activeReplica = 0;
+      for (String state : assignmentMap.values()) {
+        if (topState.equalsIgnoreCase(state)) {
+          hasTopState = true;
+        }
+        if (activeStates.contains(state)) {
+          activeReplica++;
+        }
+      }
+
+      if (activeReplica < minActiveReplica) {
+        int a = 0;
+      }
+
+      Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState));
+      Assert.assertTrue(activeReplica >= minActiveReplica, String
+          .format("%s has less active replica %d then required %d", partition, activeReplica,
+              minActiveReplica));
+    }
+  }
+
+  protected void runStage(HelixManager manager, ClusterEvent event, Stage stage) throws Exception {
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in
+    // execute() function call
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
+    stage.postProcess();
+  }
+
+  public void verifyInstance(ZkClient zkClient, String clusterName, String instance,
+      boolean wantExists) {
+    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+    String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
+    String instanceConfigPath = instanceConfigsPath + "/" + instance;
+    String instancePath = PropertyPathBuilder.instance(clusterName, instance);
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
+  }
+
+  public void verifyResource(ZkClient zkClient, String clusterName, String resource,
+      boolean wantExists) {
+    String resourcePath = PropertyPathBuilder.idealState(clusterName, resource);
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
+  }
+
+  public void verifyEnabled(ZkClient zkClient, String clusterName, String instance,
+      boolean wantEnabled) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
+    AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
+  }
+
+  public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
+    for (String partitionName : idealState.getPartitionSet()) {
+      if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) {
+        AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
+      } else if (idealState.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED) {
+        AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName).size());
+      }
+    }
+  }
+
+  protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException,
+      InterruptedException {
+    ZooKeeper oldZookeeper = zkConnection.getZookeeper();
+    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        LOG.info("In New connection, process event:" + event);
+      }
+    };
+
+    ZooKeeper newZookeeper =
+        new ZooKeeper(zkConnection.getServers(), oldZookeeper.getSessionTimeout(), watcher,
+            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
+    LOG.info("New sessionId = " + newZookeeper.getSessionId());
+    // Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    oldZookeeper = zkConnection.getZookeeper();
+    LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
+  }
+
+  protected void simulateSessionExpiry(ZkClient zkClient)
+      throws IOException, InterruptedException, IOException {
+    IZkStateListener listener = new IZkStateListener() {
+      @Override
+      public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+        LOG.info("In Old connection, state changed:" + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception {
+        LOG.info("In Old connection, new session");
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+      }
+    };
+    zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+    ZooKeeper oldZookeeper = connection.getZookeeper();
+    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        LOG.info("In New connection, process event:" + event);
+      }
+    };
+
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), oldZookeeper.getSessionTimeout(), watcher,
+            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
+    LOG.info("New sessionId = " + newZookeeper.getSessionId());
+    // Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = (ZkConnection) zkClient.getConnection();
+    oldZookeeper = connection.getZookeeper();
+    LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
+  }
+
+  protected void setupStateModel(String clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
+
+    StateModelDefinition leaderStandby =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
+
+    StateModelDefinition onlineOffline =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
+
+  }
+
+  protected Message createMessage(Message.MessageType type, String msgId, String fromState, String toState,
+      String resourceName, String tgtName) {
+    Message msg = new Message(type.toString(), msgId);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    msg.getRecord().setSimpleField(Message.Attributes.RESOURCE_NAME.toString(), resourceName);
+    msg.setTgtName(tgtName);
+    return msg;
+  }
+
+  protected List<IdealState> setupIdealState(String clusterName, int[] nodes, String[] resources,
+      int partitions, int replicas) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    List<IdealState> idealStates = new ArrayList<IdealState>();
+    List<String> instances = new ArrayList<String>();
+    for (int i : nodes) {
+      instances.add("localhost_" + i);
+    }
+
+    for (String resourceName : resources) {
+      IdealState idealState = new IdealState(resourceName);
+      for (int p = 0; p < partitions; p++) {
+        List<String> value = new ArrayList<String>();
+        for (int r = 0; r < replicas; r++) {
+          int n = nodes[(p + r) % nodes.length];
+          value.add("localhost_" + n);
+        }
+        idealState.getRecord().setListField(resourceName + "_" + p, value);
+      }
+
+      idealState.setReplicas(Integer.toString(replicas));
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(partitions);
+      idealStates.add(idealState);
+
+      // System.out.println(idealState);
+      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+    }
+    return idealStates;
+  }
+
+  protected void setupLiveInstances(String clusterName, int[] liveInstances) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    for (int i = 0; i < liveInstances.length; i++) {
+      String instance = "localhost_" + liveInstances[i];
+      LiveInstance liveInstance = new LiveInstance(instance);
+      liveInstance.setSessionId("session_" + liveInstances[i]);
+      liveInstance.setHelixVersion("0.0.0");
+      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+    }
+  }
+
+  protected void setupInstances(String clusterName, int[] instances) {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    for (int i = 0; i < instances.length; i++) {
+      String instance = "localhost_" + instances[i];
+      InstanceConfig instanceConfig = new InstanceConfig(instance);
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort("" + instances[i]);
+      instanceConfig.setInstanceEnabled(true);
+      admin.addInstance(clusterName, instanceConfig);
+    }
+  }
+
+  protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
+    try {
+      pipeline.handle(event);
+      pipeline.finish();
+    } catch (Exception e) {
+      LOG.error("Exception while executing pipeline:" + pipeline
+          + ". Will not continue to next pipeline", e);
+    }
+  }
+
+  protected void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in
+    // execute() function call
+    // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage()
+    // to a shared library
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
+    stage.postProcess();
+  }
+
+  /**
+   * Poll for the existence (or lack thereof) of a specific Helix property
+   * @param clazz the HelixProeprty subclass
+   * @param accessor connected HelixDataAccessor
+   * @param key the property key to look up
+   * @param shouldExist true if the property should exist, false otherwise
+   * @return the property if found, or null if it does not exist
+   */
+  protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, HelixDataAccessor accessor,
+      PropertyKey key, boolean shouldExist) throws InterruptedException {
+    final int POLL_TIMEOUT = 5000;
+    final int POLL_INTERVAL = 50;
+    T property = accessor.getProperty(key);
+    int timeWaited = 0;
+    while (((shouldExist && property == null) || (!shouldExist && property != null))
+        && timeWaited < POLL_TIMEOUT) {
+      Thread.sleep(POLL_INTERVAL);
+      timeWaited += POLL_INTERVAL;
+      property = accessor.getProperty(key);
+    }
+    return property;
+  }
+
+  /**
+   * Ensures that external view and current state are empty
+   */
+  protected static class EmptyZkVerifier implements ClusterStateVerifier.ZkVerifier {
+    private final String _clusterName;
+    private final String _resourceName;
+    private final ZkClient _zkClient;
+
+    /**
+     * Instantiate the verifier
+     * @param clusterName the cluster to verify
+     * @param resourceName the resource to verify
+     */
+    public EmptyZkVerifier(String clusterName, String resourceName) {
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+    }
+
+    @Override
+    public boolean verify() {
+      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+      // verify external view empty
+      if (externalView != null) {
+        for (String partition : externalView.getPartitionSet()) {
+          Map<String, String> stateMap = externalView.getStateMap(partition);
+          if (stateMap != null && !stateMap.isEmpty()) {
+            LOG.error("External view not empty for " + partition);
+            return false;
+          }
+        }
+      }
+
+      // verify current state empty
+      List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
+      for (String participant : liveParticipants) {
+        List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
+        for (String sessionId : sessionIds) {
+          CurrentState currentState =
+              accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
+          Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
+          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+            LOG.error("Current state not empty for " + participant);
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return _zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return _clusterName;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 9054a1d..ab62676 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -119,8 +119,8 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
         1);
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   @Test()
@@ -310,8 +310,8 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     Assert.assertTrue(throttleMessages.contains(msg3));
     Assert.assertTrue(throttleMessages.contains(msg4));
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   private boolean containsConstraint(Set<ConstraintItem> constraints, ConstraintItem constraint) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d38907d..90363c1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -118,8 +118,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   @Test
@@ -227,8 +227,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
     Assert.assertTrue(messages.isEmpty());
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   @Test
@@ -323,6 +323,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     Assert.assertEquals(message.getToState(), "DROPPED");
     Assert.assertEquals(message.getTgtName(), "localhost_0");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
@@ -398,8 +399,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   @Test
@@ -464,6 +465,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     Assert.assertEquals(message.getToState(), "SLAVE");
     Assert.assertEquals(message.getTgtName(), "localhost_1");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
index 5701211..00509b3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
@@ -25,7 +25,7 @@ import java.util.Date;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -41,7 +41,7 @@ import org.testng.annotations.Test;
  * which helps us write integration tests easily
  */
 
-public class SinglePartitionLeaderStandByTest extends ZkIntegrationTestBase {
+public class SinglePartitionLeaderStandByTest extends ZkTestBase {
   @Test
   public void test()
       throws Exception {
@@ -99,6 +99,7 @@ public class SinglePartitionLeaderStandByTest extends ZkIntegrationTestBase {
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
+    TestHelper.dropCluster(clusterName, _gZkClient);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index 8be880d..6aca358 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -21,7 +21,7 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
@@ -33,7 +33,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestAddClusterV2 extends ZkIntegrationTestBase {
+public class TestAddClusterV2 extends ZkTestBase {
   private static Logger LOG = LoggerFactory.getLogger(TestAddClusterV2.class);
 
   protected static final int CLUSTER_NR = 10;
@@ -54,18 +54,6 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CONTROLLER_CLUSTER;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-
-    for (int i = 0; i < CLUSTER_NR; i++) {
-      namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
-      if (_gZkClient.exists(namespace)) {
-        _gZkClient.deleteRecursively(namespace);
-      }
-    }
-
     _setupTool = new ClusterSetup(ZK_ADDR);
 
     // setup CONTROLLER_CLUSTER
@@ -136,6 +124,13 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].syncStop();
     }
+
+    // delete clusters
+    for (int i = 0; i < CLUSTER_NR; i++) {
+      String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
+      _setupTool.getClusterManagementTool().dropCluster(clusterName);
+    }
+
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
index 7e83be2..1d5273a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
+public class TestAddNodeAfterControllerStart extends ZkTestBase {
   private static Logger LOG = LoggerFactory.getLogger(TestAddNodeAfterControllerStart.class);
   final String className = getShortClassName();
 
@@ -88,6 +88,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     for (int i = 0; i < nodeNr; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -164,6 +165,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     for (int i = 0; i < nodeNr; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index b05b9fb..6209815 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -39,7 +39,7 @@ import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase {
+public class TestAddStateModelFactoryAfterConnect extends ZkTestBase {
   @Test
   public void testBasic() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
@@ -130,6 +130,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index 946eb1c..cd53552 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -68,17 +68,12 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-    _setupTool = new ClusterSetup(ZK_ADDR);
     // setup storage cluster
-    _setupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
 
     for (int i = 0; i < NODE_NR; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
 
     // start controller
@@ -108,19 +103,19 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
 
   @Test (enabled = false)
   public void testParticipantUnavailable() {
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
     HelixClusterVerifier verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(new HashSet<>(Collections.singleton(testDb))).build();
     Assert.assertTrue(verifier.verify());
 
     // disable then enable the resource to ensure no rebalancing error is generated during this process
-    _setupTool.dropResourceFromCluster(CLUSTER_NAME, testDb);
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, testDb);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
     Assert.assertTrue(verifier.verify());
 
     // Verify there is no rebalance error logged
@@ -137,7 +132,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     checkRebalanceFailureGauge(true);
 
     // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i] =
           new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[i].getInstanceName());
@@ -147,21 +142,21 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
 
   @Test (enabled = false)
   public void testTagSetIncorrect() {
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
     // set expected instance tag
     IdealState is =
-        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
     is.setInstanceGroupTag("RandomTag");
-    _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, is);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, is);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
     // Verify there is rebalance error logged
     Assert.assertNotNull(pollForError(accessor, errorNodeKey));
     checkRebalanceFailureGauge(true);
 
     // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
   }
 
   @Test (enabled = false)
@@ -188,10 +183,10 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     // Error may be recorded unexpectedly when a resource from other tests is not cleaned up.
     accessor.removeProperty(errorNodeKey);
 
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name(),
         CrushRebalanceStrategy.class.getName());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
     HelixClusterVerifier verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(new HashSet<>(Collections.singleton(testDb))).build();
@@ -212,13 +207,13 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     for (int i = replicas; i < NODE_NR; i++) {
       setDomainId(_participants[i].getInstanceName(), configAccessor);
     }
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
     Thread.sleep(1000);
     // Verify that rebalance error state is removed
     checkRebalanceFailureGauge(false);
 
     // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
     clusterConfig.setTopologyAwareEnabled(false);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
index 0c61b27..2fa8c9f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
@@ -46,8 +46,8 @@ public class TestBasicSpectator extends ZkStandAloneCMTestBase implements
     relayHelixManager.connect();
     relayHelixManager.addExternalViewChangeListener(this);
 
-    _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
 
     boolean result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
index 3feb2e0..d7c3a2d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
@@ -54,15 +54,15 @@ public class TestBatchMessageHandling extends ZkStandAloneCMTestBase {
     IdealState idealState = new FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline")
         .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
     idealState.setBatchMessageMode(true);
-    _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
 
     Thread.sleep(1000L);
 
     int numOfOnlines = 0;
     int numOfErrors = 0;
     ExternalView externalView =
-        _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
     for (String partition : externalView.getPartitionSet()) {
       if (externalView.getStateMap(partition).values().contains("ONLINE")) {
         numOfOnlines++;

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 1b764d4..7c90e5f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -30,7 +30,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -45,7 +45,7 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestBucketizedResource extends ZkIntegrationTestBase {
+public class TestBucketizedResource extends ZkTestBase {
 
   private void setupCluster(String clusterName, List<String> instanceNames, String dbName,
       int replica, int partitions, int bucketSize) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
index 3c3f4ac..7dd477a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
@@ -21,10 +21,10 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.testng.annotations.Test;
 
-public class TestCMWithFailParticipant extends ZkIntegrationTestBase {
+public class TestCMWithFailParticipant extends ZkTestBase {
   // ZkClient _zkClient;
   //
   // @BeforeClass ()

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
index 05b3e99..e01afc9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
@@ -24,7 +24,7 @@ import java.util.Date;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -33,7 +33,7 @@ import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
+public class TestCarryOverBadCurState extends ZkTestBase {
   @Test
   public void testCarryOverBadCurState() throws Exception {
     System.out.println("START testCarryOverBadCurState at " + new Date(System.currentTimeMillis()));
@@ -83,7 +83,7 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
     System.out.println("END testCarryOverBadCurState at " + new Date(System.currentTimeMillis()));
-
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
index e7a7ea0..61aadde 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
@@ -98,8 +98,8 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
     accessor.removeProperty(keyBuilder.currentState("localhost_12918", liveInstance.getSessionId(),
         "TestDB0"));
     liveInstance = accessor.getProperty(keyBuilder.liveInstance("localhost_12919"));
-    accessor.removeProperty(keyBuilder.currentState("localhost_12919", liveInstance.getSessionId(),
-        "TestDB0"));
+    accessor.removeProperty(
+        keyBuilder.currentState("localhost_12919", liveInstance.getSessionId(), "TestDB0"));
 
     // re-enable controller shall remove orphan external-view
     // System.out.println("re-enabling controller");
@@ -123,6 +123,7 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
+    TestHelper.dropCluster(clusterName, _gZkClient);
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
index 21a9ac2..9d5ad3e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
@@ -38,20 +38,14 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
   void setupCluster() throws HelixException {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-    _setupTool = new ClusterSetup(ZK_ADDR);
-
     // setup storage cluster
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL);
     for (int i = 0; i < NODE_NR; i++) {
       String storageNodeName = "localhost_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
   }
 
   @Override
@@ -60,11 +54,6 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
 
   }
 
-  @Override
-  @AfterClass()
-  public void afterClass() {
-  }
-
   @Test()
   public void testParticipantStartUp() throws Exception {
     setupCluster();
@@ -135,5 +124,8 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
       AssertJUnit.assertFalse(manager.isConnected());
     }
 
+    if (manager != null) {
+      manager.disconnect();
+    }
   }
 }