You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:14 UTC

[18/33] helix git commit: Refactor tests with TaskTestBase and remove duplicated code.

Refactor tests with TaskTestBase and remove duplicated code.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/99a40083
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/99a40083
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/99a40083

Branch: refs/heads/helix-0.6.x
Commit: 99a40083ab7bc1e1480d66107fff83f0479fa068
Parents: 9f80206
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Apr 12 11:48:54 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 15:02:05 2016 -0700

----------------------------------------------------------------------
 .../helix/integration/task/TestGenericJobs.java |  99 +------------
 .../task/TestIndependentTaskRebalancer.java     |  19 +--
 .../integration/task/TestRecurringJobQueue.java | 113 +--------------
 .../task/TestRunJobsWithMissingTarget.java      | 138 +++----------------
 .../integration/task/TestTaskRebalancer.java    | 102 +-------------
 .../task/TestTaskRebalancerFailover.java        |  97 +------------
 .../task/TestTaskRebalancerParallel.java        | 106 +-------------
 .../task/TestTaskRebalancerRetryLimit.java      |  45 ++----
 .../task/TestTaskRebalancerStopResume.java      | 106 +-------------
 .../integration/task/TestUpdateWorkflow.java    | 120 +---------------
 10 files changed, 58 insertions(+), 887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
index d96acd9..426bade 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
@@ -19,110 +19,19 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestGenericJobs extends ZkIntegrationTestBase {
+public class TestGenericJobs extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestGenericJobs.class);
-  private static final int num_nodes = 5;
-  private static final int START_PORT = 12918;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
-  private ClusterControllerManager _controller;
-  private ClusterSetup _setupTool;
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
-
-  @BeforeClass public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < num_nodes; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < num_nodes; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-
-    _driver = new TaskDriver(_manager);
-
-    boolean result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < num_nodes; i++) {
-      _participants[i].syncStop();
-    }
-    _setupTool.deleteCluster(CLUSTER_NAME);
-  }
 
   @Test public void testGenericJobs() throws Exception {
     String queueName = TestHelper.getTestMethodName();

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 046281e..0e598c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -58,17 +58,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
-  private static final int n = 5;
-  private static final int START_PORT = 12918;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
+public class TestIndependentTaskRebalancer extends TaskTestBase {
   private Set<String> _invokedClasses = Sets.newHashSet();
   private Map<String, Integer> _runCounts = Maps.newHashMap();
 
-  private HelixManager _manager;
-  private TaskDriver _driver;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -80,14 +73,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     // Setup cluster and instances
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < n; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
       setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
 
     // start dummy participants
-    for (int i = 0; i < n; i++) {
-      final String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+    for (int i = 0; i < _numNodes; i++) {
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
 
       // Set task callbacks
       Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
@@ -226,7 +219,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
     Map<String, String> taskConfigMap = Maps.newHashMap(
-        ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT));
+        ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + _startPort));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index b2e61ca..8262b9b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -20,140 +20,29 @@ package org.apache.helix.integration.task;
  */
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TargetState;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
-public class TestRecurringJobQueue extends ZkIntegrationTestBase {
+public class TestRecurringJobQueue extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestRecurringJobQueue.class);
-  private static final int n = 5;
-  private static final int START_PORT = 12918;
-  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
   private static final String TIMEOUT_CONFIG = "Timeout";
-  private static final String TGT_DB = "TestDB";
-  private static final int NUM_PARTITIONS = 20;
-  private static final int NUM_REPLICAS = 3;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
-  private ZKHelixDataAccessor _accessor;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    _accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < n; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    // Set up target db
-    setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
-    setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < n; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
-          taskFactoryReg));
-
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager =
-        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
-            ZK_ADDR);
-    _manager.connect();
-
-    _driver = new TaskDriver(_manager);
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
-            ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < n; i++) {
-      _participants[i].syncStop();
-    }
-  }
-
-
 
   @Test
   public void deleteRecreateRecurrentQueue() throws Exception {

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index bd05f81..5a07942 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -19,128 +19,26 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import com.google.common.collect.Sets;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Sets;
 
-public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
+public class TestRunJobsWithMissingTarget extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestRunJobsWithMissingTarget.class);
-  private static final int num_nodes = 5;
-  private static final int num_dbs = 5;
-  private static final int START_PORT = 12918;
-  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
-  private static final int NUM_PARTITIONS = 20;
-  private static final int NUM_REPLICAS = 3;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
-  private ClusterControllerManager _controller;
-  private ClusterSetup _setupTool;
-
-  private List<String> _test_dbs = new ArrayList<String>();
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < num_nodes; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    // Set up target dbs
-    for (int i = 0; i < num_dbs; i++) {
-      String db = "TestDB" + i;
-      _setupTool
-          .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL,
-              IdealState.RebalanceMode.FULL_AUTO.toString());
-      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
-      _test_dbs.add(db);
-    }
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < num_nodes; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-
-    _driver = new TaskDriver(_manager);
-
-    boolean result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < num_nodes; i++) {
-      _participants[i].syncStop();
-    }
-    _setupTool.deleteCluster(CLUSTER_NAME);
+    _numDbs = 5;
+    super.beforeClass();
   }
 
   @Test
@@ -152,17 +50,17 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
     JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i < num_dbs; i++) {
+    for (int i = 0; i < _numDbs; i++) {
       JobConfig.Builder jobConfig =
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(
-              _test_dbs.get(i))
+              _testDbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
-      String jobName = "job" + _test_dbs.get(i);
+      String jobName = "job" + _testDbs.get(i);
       queueBuilder.enqueueJob(jobName, jobConfig);
       currentJobNames.add(jobName);
     }
 
-    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1));
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(1));
     _driver.start(queueBuilder.build());
 
     String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(1));
@@ -181,11 +79,11 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
     JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i < num_dbs; i++) {
+    for (int i = 0; i < _numDbs; i++) {
       JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_testDbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
-      String jobName = "job" + _test_dbs.get(i);
+      String jobName = "job" + _testDbs.get(i);
       queueBuilder.enqueueJob(jobName, jobConfig);
       currentJobNames.add(jobName);
     }
@@ -210,17 +108,17 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
     JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i < num_dbs; i++) {
+    for (int i = 0; i < _numDbs; i++) {
       JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_testDbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
-      String jobName = "job" + _test_dbs.get(i);
+      String jobName = "job" + _testDbs.get(i);
       queueBuilder.enqueueJob(jobName, jobConfig);
       currentJobNames.add(jobName);
     }
 
     _driver.start(queueBuilder.build());
-    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0));
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(0));
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
     TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 9df920b..f5a3441 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,42 +19,26 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
@@ -62,88 +46,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
-public class TestTaskRebalancer extends ZkIntegrationTestBase {
-  private static final int n = 5;
-  private static final int START_PORT = 12918;
-  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+public class TestTaskRebalancer extends TaskTestBase {
   private static final String TIMEOUT_CONFIG = "Timeout";
-  private static final int NUM_PARTITIONS = 20;
-  private static final int NUM_REPLICAS = 3;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < n; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    // Set up target db
-    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
-        MASTER_SLAVE_STATE_MODEL);
-    setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < n; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
-          taskFactoryReg));
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager =
-        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
-            ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
-
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    // _controller = null;
-    for (int i = 0; i < n; i++) {
-      _participants[i].syncStop();
-      // _participants[i] = null;
-    }
-  }
 
   @Test
   public void basic() throws Exception {
@@ -214,7 +118,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
     // Ensure all partitions are completed individually
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
-    for (int i = 0; i < NUM_PARTITIONS; i++) {
+    for (int i = 0; i < _numParitions; i++) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
     }
@@ -287,7 +191,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Check that all partitions timed out up to maxAttempts
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     int maxAttempts = 0;
-    for (int i = 0; i < NUM_PARTITIONS; i++) {
+    for (int i = 0; i < _numParitions; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
       if (state != null) {
         Assert.assertEquals(state, TaskPartitionState.TIMED_OUT);

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index 8051b2f..9d98ba9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -19,118 +19,27 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Sets;
 
-public class TestTaskRebalancerFailover extends ZkUnitTestBase {
+public class TestTaskRebalancerFailover extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerFailover.class);
 
-  private final String _clusterName = TestHelper.getTestClassName();
-  private static final int _n = 5;
-  private static final int _p = 20;
-  private static final int _r = 3;
-  private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
-  private ClusterControllerManager _controller;
-  private HelixManager _manager;
-  private TaskDriver _driver;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    ClusterSetup setup = new ClusterSetup(_gZkClient);
-    setup.addCluster(_clusterName, true);
-    for (int i = 0; i < _n; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-      setup.addInstanceToCluster(_clusterName, instanceName);
-    }
-
-    // Set up target db
-    setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
-    setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < _n; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
-          taskFactoryReg));
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = "controller";
-    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager =
-        HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
-            ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
-
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                _clusterName));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < _n; i++) {
-      if (_participants[i] != null && _participants[i].isConnected()) {
-        _participants[i].syncStop();
-      }
-    }
-  }
-
   @Test
   public void test() throws Exception {
     String queueName = TestHelper.getTestMethodName();
@@ -159,7 +68,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
         accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
     JobContext ctx = _driver.getJobContext(namespacedJob1);
     Set<String> failOverPartitions = Sets.newHashSet();
-    for (int p = 0; p < _p; p++) {
+    for (int p = 0; p < _numParitions; p++) {
       String instanceName = ctx.getAssignedParticipant(p);
       Assert.assertNotNull(instanceName);
       String partitionName = ctx.getTargetForPartition(p);
@@ -185,7 +94,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     // tasks previously assigned to localhost_12918 should be re-scheduled on new master
     ctx = _driver.getJobContext(namespacedJob2);
     ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
-    for (int p = 0; p < _p; p++) {
+    for (int p = 0; p < _numParitions; p++) {
       String partitionName = ctx.getTargetForPartition(p);
       Assert.assertNotNull(partitionName);
       if (failOverPartitions.contains(partitionName)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index b091748..dbc4154 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -20,121 +20,23 @@ package org.apache.helix.integration.task;
  */
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
-  private static final int n = 5;
-  private static final int START_PORT = 12918;
-  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
-  private static final int NUM_PARTITIONS = 20;
-  private static final int NUM_REPLICAS = 3;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final List<String> testDbNames =
-      Arrays.asList("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_4");
-
-
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
+public class TestTaskRebalancerParallel extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < n; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    for (String testDbName : testDbNames) {
-      setupTool.addResourceToCluster(CLUSTER_NAME, testDbName, NUM_PARTITIONS,
-          MASTER_SLAVE_STATE_MODEL);
-      setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDbName, NUM_REPLICAS);
-    }
-
-    // start dummy participants
-    for (int i = 0; i < n; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      final long delay = (i + 1) * 1000L;
-      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-      taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-        @Override
-        public Task createNewTask(TaskCallbackContext context) {
-          return new MockTask(context);
-        }
-      });
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager =
-        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
-            ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
-
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    // _controller = null;
-    for (int i = 0; i < n; i++) {
-      _participants[i].syncStop();
-      // _participants[i] = null;
-    }
+   _numDbs = 4;
+    super.beforeClass();
   }
 
   @Test public void test() throws Exception {
@@ -151,7 +53,7 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
     _driver.createQueue(queue);
 
     List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
-    for (String testDbName : testDbNames) {
+    for (String testDbName : _testDbs) {
       jobConfigBuilders.add(
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName)
               .setTargetPartitionStates(Collections.singleton("SLAVE")));

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index e576304..a277358 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -22,11 +22,9 @@ package org.apache.helix.integration.task;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
@@ -45,35 +43,26 @@ import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
  * Test task will be retried up to MaxAttemptsPerTask {@see HELIX-562}
  */
-public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
-  private final String _clusterName = TestHelper.getTestClassName();
-  private static final int _n = 5;
-  private static final int _p = 20;
-  private static final int _r = 3;
-  private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
-  private ClusterControllerManager _controller;
-  private HelixManager _manager;
-  private TaskDriver _driver;
+public class TestTaskRebalancerRetryLimit extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
     ClusterSetup setup = new ClusterSetup(_gZkClient);
-    setup.addCluster(_clusterName, true);
-    for (int i = 0; i < _n; i++) {
+    setup.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < _numNodes; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      setup.addInstanceToCluster(_clusterName, instanceName);
+      setup.addInstanceToCluster(CLUSTER_NAME, instanceName);
     }
 
     // Set up target db
-    setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
-    setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
+    setup.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, "MasterSlave");
+    setup.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("ErrorTask", new TaskFactory() {
@@ -84,9 +73,9 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     });
 
     // start dummy participants
-    for (int i = 0; i < _n; i++) {
+    for (int i = 0; i < _numNodes; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
 
       // Register a Task state model factory.
       StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -97,12 +86,12 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
 
     // start controller
     String controllerName = "controller";
-    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     // create cluster manager
     _manager =
-        HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
             ZK_ADDR);
     _manager.connect();
     _driver = new TaskDriver(_manager);
@@ -110,20 +99,10 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     boolean result =
         ClusterStateVerifier
             .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                _clusterName));
+                CLUSTER_NAME));
     Assert.assertTrue(result);
   }
 
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < _n; i++) {
-      if (_participants[i] != null && _participants[i].isConnected()) {
-        _participants[i].syncStop();
-      }
-    }
-  }
 
   @Test public void test() throws Exception {
     String jobResource = TestHelper.getTestMethodName();
@@ -141,7 +120,7 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
 
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
-    for (int i = 0; i < _p; i++) {
+    for (int i = 0; i < _numParitions; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
       if (state != null) {
         Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 30cb460..e92a129 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,141 +21,39 @@ package org.apache.helix.integration.task;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
+public class TestTaskRebalancerStopResume extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
-  private static final int n = 5;
-  private static final int START_PORT = 12918;
-  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
   private static final String TIMEOUT_CONFIG = "Timeout";
-  private static final String TGT_DB = "TestDB";
   private static final String JOB_RESOURCE = "SomeJob";
-  private static final int NUM_PARTITIONS = 20;
-  private static final int NUM_REPLICAS = 3;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < n; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    // Set up target db
-    setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
-    setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg
-        .put(MockTask.TASK_COMMAND, new TaskFactory() {
-          @Override public Task createNewTask(TaskCallbackContext context) {
-            return new MockTask(context);
-          }
-        });
-
-    // start dummy participants
-    for (int i = 0; i < n; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
-          taskFactoryReg));
-
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager =
-        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
-            ZK_ADDR);
-    _manager.connect();
-
-    _driver = new TaskDriver(_manager);
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
-            ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < n; i++) {
-      _participants[i].syncStop();
-    }
-  }
 
   @Test public void stopAndResume() throws Exception {
     Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));

http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
index 2e53b36..b43c49e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -19,135 +19,25 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import com.google.common.collect.Sets;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import java.util.Calendar;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
 import org.apache.helix.task.TargetState;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
 
-public class TestUpdateWorkflow extends ZkIntegrationTestBase {
+public class TestUpdateWorkflow extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestUpdateWorkflow.class);
-  private static final int n = 5;
-  private static final int START_PORT = 12918;
-  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
-  private static final String TIMEOUT_CONFIG = "Timeout";
-  private static final int NUM_PARTITIONS = 20;
-  private static final int NUM_REPLICAS = 3;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
-
-  private HelixManager _manager;
-  private TaskDriver _driver;
-  private ZKHelixDataAccessor _accessor;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-
-    _accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < n; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    // Set up target db
-    setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
-        MASTER_SLAVE_STATE_MODEL);
-    setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < n; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
-          taskFactoryReg));
-
-      _participants[i].syncStart();
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // create cluster manager
-    _manager =
-        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
-            ZK_ADDR);
-    _manager.connect();
-
-    _driver = new TaskDriver(_manager);
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
-            ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                CLUSTER_NAME));
-    Assert.assertTrue(result);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    _controller.syncStop();
-    for (int i = 0; i < n; i++) {
-      _participants[i].syncStop();
-    }
-  }
 
   @Test
   public void testUpdateRunningQueue() throws InterruptedException {