You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:14 UTC
[18/33] helix git commit: Refactor tests with TaskTestBase and remove
duplicated code.
Refactor tests with TaskTestBase and remove duplicated code.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/99a40083
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/99a40083
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/99a40083
Branch: refs/heads/helix-0.6.x
Commit: 99a40083ab7bc1e1480d66107fff83f0479fa068
Parents: 9f80206
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Apr 12 11:48:54 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 15:02:05 2016 -0700
----------------------------------------------------------------------
.../helix/integration/task/TestGenericJobs.java | 99 +------------
.../task/TestIndependentTaskRebalancer.java | 19 +--
.../integration/task/TestRecurringJobQueue.java | 113 +--------------
.../task/TestRunJobsWithMissingTarget.java | 138 +++----------------
.../integration/task/TestTaskRebalancer.java | 102 +-------------
.../task/TestTaskRebalancerFailover.java | 97 +------------
.../task/TestTaskRebalancerParallel.java | 106 +-------------
.../task/TestTaskRebalancerRetryLimit.java | 45 ++----
.../task/TestTaskRebalancerStopResume.java | 106 +-------------
.../integration/task/TestUpdateWorkflow.java | 120 +---------------
10 files changed, 58 insertions(+), 887 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
index d96acd9..426bade 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
@@ -19,110 +19,19 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestGenericJobs extends ZkIntegrationTestBase {
+public class TestGenericJobs extends TaskTestBase {
private static final Logger LOG = Logger.getLogger(TestGenericJobs.class);
- private static final int num_nodes = 5;
- private static final int START_PORT = 12918;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
- private ClusterControllerManager _controller;
- private ClusterSetup _setupTool;
-
- private HelixManager _manager;
- private TaskDriver _driver;
-
- @BeforeClass public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- _setupTool = new ClusterSetup(ZK_ADDR);
- _setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < num_nodes; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < num_nodes; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task",
- new TaskStateModelFactory(_participants[i], taskFactoryReg));
-
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager = HelixManagerFactory
- .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
- _manager.connect();
-
- _driver = new TaskDriver(_manager);
-
- boolean result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < num_nodes; i++) {
- _participants[i].syncStop();
- }
- _setupTool.deleteCluster(CLUSTER_NAME);
- }
@Test public void testGenericJobs() throws Exception {
String queueName = TestHelper.getTestMethodName();
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 046281e..0e598c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -58,17 +58,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
- private static final int n = 5;
- private static final int START_PORT = 12918;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
+public class TestIndependentTaskRebalancer extends TaskTestBase {
private Set<String> _invokedClasses = Sets.newHashSet();
private Map<String, Integer> _runCounts = Maps.newHashMap();
- private HelixManager _manager;
- private TaskDriver _driver;
@BeforeClass
public void beforeClass() throws Exception {
@@ -80,14 +73,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
// Setup cluster and instances
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < n; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ for (int i = 0; i < _numNodes; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
// start dummy participants
- for (int i = 0; i < n; i++) {
- final String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ for (int i = 0; i < _numNodes; i++) {
+ final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
// Set task callbacks
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
@@ -226,7 +219,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
Map<String, String> taskConfigMap = Maps.newHashMap(
- ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT));
+ ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + _startPort));
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index b2e61ca..8262b9b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -20,140 +20,29 @@ package org.apache.helix.integration.task;
*/
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TargetState;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
-public class TestRecurringJobQueue extends ZkIntegrationTestBase {
+public class TestRecurringJobQueue extends TaskTestBase {
private static final Logger LOG = Logger.getLogger(TestRecurringJobQueue.class);
- private static final int n = 5;
- private static final int START_PORT = 12918;
- private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
private static final String TIMEOUT_CONFIG = "Timeout";
- private static final String TGT_DB = "TestDB";
- private static final int NUM_PARTITIONS = 20;
- private static final int NUM_REPLICAS = 3;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
-
- private HelixManager _manager;
- private TaskDriver _driver;
- private ZKHelixDataAccessor _accessor;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- _accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < n; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- // Set up target db
- setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
- setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < n; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
- taskFactoryReg));
-
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager =
- HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- _manager.connect();
-
- _driver = new TaskDriver(_manager);
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
- ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < n; i++) {
- _participants[i].syncStop();
- }
- }
-
-
@Test
public void deleteRecreateRecurrentQueue() throws Exception {
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index bd05f81..5a07942 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -19,128 +19,26 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import com.google.common.collect.Sets;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Sets;
-public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
+public class TestRunJobsWithMissingTarget extends TaskTestBase {
private static final Logger LOG = Logger.getLogger(TestRunJobsWithMissingTarget.class);
- private static final int num_nodes = 5;
- private static final int num_dbs = 5;
- private static final int START_PORT = 12918;
- private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
- private static final int NUM_PARTITIONS = 20;
- private static final int NUM_REPLICAS = 3;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
- private ClusterControllerManager _controller;
- private ClusterSetup _setupTool;
-
- private List<String> _test_dbs = new ArrayList<String>();
-
- private HelixManager _manager;
- private TaskDriver _driver;
@BeforeClass
public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- _setupTool = new ClusterSetup(ZK_ADDR);
- _setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < num_nodes; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- // Set up target dbs
- for (int i = 0; i < num_dbs; i++) {
- String db = "TestDB" + i;
- _setupTool
- .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL,
- IdealState.RebalanceMode.FULL_AUTO.toString());
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
- _test_dbs.add(db);
- }
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < num_nodes; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task",
- new TaskStateModelFactory(_participants[i], taskFactoryReg));
-
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager = HelixManagerFactory
- .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
- _manager.connect();
-
- _driver = new TaskDriver(_manager);
-
- boolean result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < num_nodes; i++) {
- _participants[i].syncStop();
- }
- _setupTool.deleteCluster(CLUSTER_NAME);
+ _numDbs = 5;
+ super.beforeClass();
}
@Test
@@ -152,17 +50,17 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i < num_dbs; i++) {
+ for (int i = 0; i < _numDbs; i++) {
JobConfig.Builder jobConfig =
new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(
- _test_dbs.get(i))
+ _testDbs.get(i))
.setTargetPartitionStates(Sets.newHashSet("SLAVE"));
- String jobName = "job" + _test_dbs.get(i);
+ String jobName = "job" + _testDbs.get(i);
queueBuilder.enqueueJob(jobName, jobConfig);
currentJobNames.add(jobName);
}
- _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1));
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(1));
_driver.start(queueBuilder.build());
String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(1));
@@ -181,11 +79,11 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i < num_dbs; i++) {
+ for (int i = 0; i < _numDbs; i++) {
JobConfig.Builder jobConfig =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_testDbs.get(i))
.setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
- String jobName = "job" + _test_dbs.get(i);
+ String jobName = "job" + _testDbs.get(i);
queueBuilder.enqueueJob(jobName, jobConfig);
currentJobNames.add(jobName);
}
@@ -210,17 +108,17 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i < num_dbs; i++) {
+ for (int i = 0; i < _numDbs; i++) {
JobConfig.Builder jobConfig =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_testDbs.get(i))
.setTargetPartitionStates(Sets.newHashSet("SLAVE"));
- String jobName = "job" + _test_dbs.get(i);
+ String jobName = "job" + _testDbs.get(i);
queueBuilder.enqueueJob(jobName, jobConfig);
currentJobNames.add(jobName);
}
_driver.start(queueBuilder.build());
- _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0));
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(0));
String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 9df920b..f5a3441 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,42 +19,26 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Joiner;
@@ -62,88 +46,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
-public class TestTaskRebalancer extends ZkIntegrationTestBase {
- private static final int n = 5;
- private static final int START_PORT = 12918;
- private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+public class TestTaskRebalancer extends TaskTestBase {
private static final String TIMEOUT_CONFIG = "Timeout";
- private static final int NUM_PARTITIONS = 20;
- private static final int NUM_REPLICAS = 3;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
-
- private HelixManager _manager;
- private TaskDriver _driver;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < n; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- // Set up target db
- setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
- MASTER_SLAVE_STATE_MODEL);
- setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < n; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
- taskFactoryReg));
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager =
- HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- _manager.connect();
- _driver = new TaskDriver(_manager);
-
- boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- // _controller = null;
- for (int i = 0; i < n; i++) {
- _participants[i].syncStop();
- // _participants[i] = null;
- }
- }
@Test
public void basic() throws Exception {
@@ -214,7 +118,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
// Ensure all partitions are completed individually
JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
- for (int i = 0; i < NUM_PARTITIONS; i++) {
+ for (int i = 0; i < _numParitions; i++) {
Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
}
@@ -287,7 +191,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
// Check that all partitions timed out up to maxAttempts
JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
int maxAttempts = 0;
- for (int i = 0; i < NUM_PARTITIONS; i++) {
+ for (int i = 0; i < _numParitions; i++) {
TaskPartitionState state = ctx.getPartitionState(i);
if (state != null) {
Assert.assertEquals(state, TaskPartitionState.TIMED_OUT);
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index 8051b2f..9d98ba9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -19,118 +19,27 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;
-public class TestTaskRebalancerFailover extends ZkUnitTestBase {
+public class TestTaskRebalancerFailover extends TaskTestBase {
private static final Logger LOG = Logger.getLogger(TestTaskRebalancerFailover.class);
- private final String _clusterName = TestHelper.getTestClassName();
- private static final int _n = 5;
- private static final int _p = 20;
- private static final int _r = 3;
- private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
- private ClusterControllerManager _controller;
- private HelixManager _manager;
- private TaskDriver _driver;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- ClusterSetup setup = new ClusterSetup(_gZkClient);
- setup.addCluster(_clusterName, true);
- for (int i = 0; i < _n; i++) {
- String instanceName = "localhost_" + (12918 + i);
- setup.addInstanceToCluster(_clusterName, instanceName);
- }
-
- // Set up target db
- setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
- setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < _n; i++) {
- String instanceName = "localhost_" + (12918 + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
- taskFactoryReg));
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = "controller";
- _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager =
- HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- _manager.connect();
- _driver = new TaskDriver(_manager);
-
- boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- _clusterName));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < _n; i++) {
- if (_participants[i] != null && _participants[i].isConnected()) {
- _participants[i].syncStop();
- }
- }
- }
-
@Test
public void test() throws Exception {
String queueName = TestHelper.getTestMethodName();
@@ -159,7 +68,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
JobContext ctx = _driver.getJobContext(namespacedJob1);
Set<String> failOverPartitions = Sets.newHashSet();
- for (int p = 0; p < _p; p++) {
+ for (int p = 0; p < _numParitions; p++) {
String instanceName = ctx.getAssignedParticipant(p);
Assert.assertNotNull(instanceName);
String partitionName = ctx.getTargetForPartition(p);
@@ -185,7 +94,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
// tasks previously assigned to localhost_12918 should be re-scheduled on new master
ctx = _driver.getJobContext(namespacedJob2);
ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB));
- for (int p = 0; p < _p; p++) {
+ for (int p = 0; p < _numParitions; p++) {
String partitionName = ctx.getTargetForPartition(p);
Assert.assertNotNull(partitionName);
if (failOverPartitions.contains(partitionName)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index b091748..dbc4154 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -20,121 +20,23 @@ package org.apache.helix.integration.task;
*/
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
- private static final int n = 5;
- private static final int START_PORT = 12918;
- private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
- private static final int NUM_PARTITIONS = 20;
- private static final int NUM_REPLICAS = 3;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final List<String> testDbNames =
- Arrays.asList("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_4");
-
-
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
-
- private HelixManager _manager;
- private TaskDriver _driver;
+public class TestTaskRebalancerParallel extends TaskTestBase {
@BeforeClass
public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < n; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- for (String testDbName : testDbNames) {
- setupTool.addResourceToCluster(CLUSTER_NAME, testDbName, NUM_PARTITIONS,
- MASTER_SLAVE_STATE_MODEL);
- setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDbName, NUM_REPLICAS);
- }
-
- // start dummy participants
- for (int i = 0; i < n; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- final long delay = (i + 1) * 1000L;
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task",
- new TaskStateModelFactory(_participants[i], taskFactoryReg));
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager =
- HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- _manager.connect();
- _driver = new TaskDriver(_manager);
-
- boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- // _controller = null;
- for (int i = 0; i < n; i++) {
- _participants[i].syncStop();
- // _participants[i] = null;
- }
+ _numDbs = 4;
+ super.beforeClass();
}
@Test public void test() throws Exception {
@@ -151,7 +53,7 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
_driver.createQueue(queue);
List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
- for (String testDbName : testDbNames) {
+ for (String testDbName : _testDbs) {
jobConfigBuilders.add(
new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName)
.setTargetPartitionStates(Collections.singleton("SLAVE")));
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index e576304..a277358 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -22,11 +22,9 @@ package org.apache.helix.integration.task;
import java.util.HashMap;
import java.util.Map;
-import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.participant.StateMachineEngine;
@@ -45,35 +43,26 @@ import org.apache.helix.task.Workflow;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Test task will be retried up to MaxAttemptsPerTask {@see HELIX-562}
*/
-public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
- private final String _clusterName = TestHelper.getTestClassName();
- private static final int _n = 5;
- private static final int _p = 20;
- private static final int _r = 3;
- private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
- private ClusterControllerManager _controller;
- private HelixManager _manager;
- private TaskDriver _driver;
+public class TestTaskRebalancerRetryLimit extends TaskTestBase {
@BeforeClass
public void beforeClass() throws Exception {
ClusterSetup setup = new ClusterSetup(_gZkClient);
- setup.addCluster(_clusterName, true);
- for (int i = 0; i < _n; i++) {
+ setup.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < _numNodes; i++) {
String instanceName = "localhost_" + (12918 + i);
- setup.addInstanceToCluster(_clusterName, instanceName);
+ setup.addInstanceToCluster(CLUSTER_NAME, instanceName);
}
// Set up target db
- setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
- setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
+ setup.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions, "MasterSlave");
+ setup.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
taskFactoryReg.put("ErrorTask", new TaskFactory() {
@@ -84,9 +73,9 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
});
// start dummy participants
- for (int i = 0; i < _n; i++) {
+ for (int i = 0; i < _numNodes; i++) {
String instanceName = "localhost_" + (12918 + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -97,12 +86,12 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
// start controller
String controllerName = "controller";
- _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
// create cluster manager
_manager =
- HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
ZK_ADDR);
_manager.connect();
_driver = new TaskDriver(_manager);
@@ -110,20 +99,10 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
boolean result =
ClusterStateVerifier
.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- _clusterName));
+ CLUSTER_NAME));
Assert.assertTrue(result);
}
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < _n; i++) {
- if (_participants[i] != null && _participants[i].isConnected()) {
- _participants[i].syncStop();
- }
- }
- }
@Test public void test() throws Exception {
String jobResource = TestHelper.getTestMethodName();
@@ -141,7 +120,7 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
- for (int i = 0; i < _p; i++) {
+ for (int i = 0; i < _numParitions; i++) {
TaskPartitionState state = ctx.getPartitionState(i);
if (state != null) {
Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 30cb460..e92a129 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,141 +21,39 @@ package org.apache.helix.integration.task;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.ScheduleConfig;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConstants;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
+public class TestTaskRebalancerStopResume extends TaskTestBase {
private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
- private static final int n = 5;
- private static final int START_PORT = 12918;
- private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
private static final String TIMEOUT_CONFIG = "Timeout";
- private static final String TGT_DB = "TestDB";
private static final String JOB_RESOURCE = "SomeJob";
- private static final int NUM_PARTITIONS = 20;
- private static final int NUM_REPLICAS = 3;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
-
- private HelixManager _manager;
- private TaskDriver _driver;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < n; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- // Set up target db
- setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
- setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg
- .put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < n; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
- taskFactoryReg));
-
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager =
- HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- _manager.connect();
-
- _driver = new TaskDriver(_manager);
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
- ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < n; i++) {
- _participants[i].syncStop();
- }
- }
@Test public void stopAndResume() throws Exception {
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
http://git-wip-us.apache.org/repos/asf/helix/blob/99a40083/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
index 2e53b36..b43c49e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -19,135 +19,25 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import com.google.common.collect.Sets;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import java.util.Calendar;
+import java.util.concurrent.TimeUnit;
+
import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.ScheduleConfig;
import org.apache.helix.task.TargetState;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
-public class TestUpdateWorkflow extends ZkIntegrationTestBase {
+public class TestUpdateWorkflow extends TaskTestBase {
private static final Logger LOG = Logger.getLogger(TestUpdateWorkflow.class);
- private static final int n = 5;
- private static final int START_PORT = 12918;
- private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
- private static final String TIMEOUT_CONFIG = "Timeout";
- private static final int NUM_PARTITIONS = 20;
- private static final int NUM_REPLICAS = 3;
- private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final MockParticipantManager[] _participants = new MockParticipantManager[n];
- private ClusterControllerManager _controller;
-
- private HelixManager _manager;
- private TaskDriver _driver;
- private ZKHelixDataAccessor _accessor;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursive(namespace);
- }
-
- _accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < n; i++) {
- String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
-
- // Set up target db
- setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
- MASTER_SLAVE_STATE_MODEL);
- setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
-
- Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new MockTask(context);
- }
- });
-
- // start dummy participants
- for (int i = 0; i < n; i++) {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
- // Register a Task state model factory.
- StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
- stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
- taskFactoryReg));
-
- _participants[i].syncStart();
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // create cluster manager
- _manager =
- HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
- ZK_ADDR);
- _manager.connect();
-
- _driver = new TaskDriver(_manager);
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
- ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception {
- _manager.disconnect();
- _controller.syncStop();
- for (int i = 0; i < n; i++) {
- _participants[i].syncStop();
- }
- }
@Test
public void testUpdateRunningQueue() throws InterruptedException {