You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/06/16 23:36:10 UTC
[1/4] helix git commit: [HELIX-618] Job hung if the target resource
does not exist anymore at the time when it is scheduled.
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 6b6bb8f60 -> fe540ac9e
[HELIX-618] Job hung if the target resource does not exist anymore at the time when it is scheduled.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fe540ac9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fe540ac9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fe540ac9
Branch: refs/heads/helix-0.6.x
Commit: fe540ac9ec93fb3fb1caa71acaede9c3a63e9fd4
Parents: d381a3a
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 16:34:31 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700
----------------------------------------------------------------------
.../FixedTargetTaskAssignmentCalculator.java | 14 +-
.../org/apache/helix/task/JobRebalancer.java | 51 +++--
.../org/apache/helix/task/TaskRebalancer.java | 41 ++--
.../apache/helix/task/WorkflowRebalancer.java | 31 +--
.../task/TestRunJobsWithMissingTarget.java | 214 +++++++++++++++++++
5 files changed, 306 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 8760524..60cd92f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -37,6 +37,7 @@ import org.apache.helix.model.ResourceAssignment;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.log4j.Logger;
/**
* A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states on a target
@@ -44,6 +45,7 @@ import com.google.common.collect.Sets;
* (if desired) only where those partitions are in a given state.
*/
public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
+ private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
@Override
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
@@ -58,6 +60,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
Set<Integer> partitionSet, ClusterDataCache cache) {
IdealState tgtIs = getTgtIdealState(jobCfg, cache);
if (tgtIs == null) {
+ LOG.warn("Missing target resource for the scheduled job!");
return Collections.emptyMap();
}
Set<String> tgtStates = jobCfg.getTargetPartitionStates();
@@ -78,21 +81,22 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
/**
* Returns the set of all partition ids for a job.
- * <p/>
* If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
* use the list of all partition ids from the target resource.
+ * return empty set if target resource does not exist.
*/
private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
JobContext taskCtx) {
- if (tgtResourceIs == null) {
- return null;
- }
Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
SortedSet<String> targetPartitions = Sets.newTreeSet();
if (jobCfg.getTargetPartitions() != null) {
targetPartitions.addAll(jobCfg.getTargetPartitions());
} else {
- targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+ if (tgtResourceIs != null) {
+ targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+ } else {
+ LOG.warn("Missing target resource for the scheduled job!");
+ }
}
Set<Integer> taskPartitions = Sets.newTreeSet();
http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0e2ab15..93d4689 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -31,7 +31,6 @@ import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -206,9 +205,23 @@ public class JobRebalancer extends TaskRebalancer {
TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
Set<Integer> allPartitions =
taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+
+ if (allPartitions == null || allPartitions.isEmpty()) {
+ // Empty target partitions, mark the job as FAILED.
+ LOG.warn(
+ "Missing task partition mapping for job " + jobResource + ", marked the job as FAILED!");
+ markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
+ markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
+ return new ResourceAssignment(jobResource);
+ }
+
Map<String, SortedSet<Integer>> taskAssignments =
getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
long currentTime = System.currentTimeMillis();
+
+ LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments
+ + " excludedInstances: " + excludedInstances);
+
for (String instance : taskAssignments.keySet()) {
if (excludedInstances.contains(instance)) {
continue;
@@ -322,13 +335,7 @@ public class JobRebalancer extends TaskRebalancer {
}
if (!successOptional) {
- long finishTime = currentTime;
- workflowCtx.setJobState(jobResource, TaskState.FAILED);
- if (workflowConfig.isTerminable()) {
- workflowCtx.setWorkflowState(TaskState.FAILED);
- workflowCtx.setFinishTime(finishTime);
- }
- jobCtx.setFinishTime(finishTime);
+ markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
markAllPartitionsError(jobCtx, currState, false);
addAllPartitions(allPartitions, partitionsToDropFromIs);
@@ -367,13 +374,7 @@ public class JobRebalancer extends TaskRebalancer {
scheduleForNextTask(jobResource, jobCtx, currentTime);
if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
- workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
- jobCtx.setFinishTime(currentTime);
- if (isWorkflowComplete(workflowCtx, workflowConfig)) {
- workflowCtx.setWorkflowState(TaskState.COMPLETED);
- workflowCtx.setFinishTime(currentTime);
- }
-
+ markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
// remove IdealState of this job
TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
}
@@ -428,6 +429,26 @@ public class JobRebalancer extends TaskRebalancer {
return ra;
}
+ private void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ long currentTime = System.currentTimeMillis();
+ workflowContext.setJobState(jobName, TaskState.FAILED);
+ jobContext.setFinishTime(currentTime);
+ if (isWorkflowFinished(workflowContext, workflowConfig)) {
+ workflowContext.setFinishTime(currentTime);
+ }
+ }
+
+ private void markJobComplete(String jobName, JobContext jobContext,
+ WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
+ long currentTime = System.currentTimeMillis();
+ workflowContext.setJobState(jobName, TaskState.COMPLETED);
+ jobContext.setFinishTime(currentTime);
+ if (isWorkflowFinished(workflowContext, workflowConfig)) {
+ workflowContext.setFinishTime(currentTime);
+ }
+ }
+
private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
// Clear current entries if they exist and are expired
long currentTime = now;
http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1526883..f35ce69 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -62,22 +62,33 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
CurrentStateOutput currStateOutput);
/**
- * Checks if the workflow has completed.
+ * Checks if the workflow has finished (either completed or failed).
+ * Set the state in workflow context properly.
*
* @param ctx Workflow context containing job states
* @param cfg Workflow config containing set of jobs
- * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+ * @return returns true if the workflow either completed (all tasks are {@link TaskState#COMPLETED})
+ * or failed (any task is {@link TaskState#FAILED}, false otherwise.
*/
- protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
- if (!cfg.isTerminable()) {
- return false;
- }
+ protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg) {
+ boolean incomplete = false;
for (String job : cfg.getJobDag().getAllNodes()) {
- if (ctx.getJobState(job) != TaskState.COMPLETED) {
- return false;
+ TaskState jobState = ctx.getJobState(job);
+ if (jobState == TaskState.FAILED) {
+ ctx.setWorkflowState(TaskState.FAILED);
+ return true;
+ }
+ if (jobState != TaskState.COMPLETED) {
+ incomplete = true;
}
}
- return true;
+
+ if (!incomplete && cfg.isTerminable()) {
+ ctx.setWorkflowState(TaskState.COMPLETED);
+ return true;
+ }
+
+ return false;
}
/**
@@ -124,6 +135,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
WorkflowContext workflowCtx) {
int notStartedCount = 0;
int inCompleteCount = 0;
+ int failedCount = 0;
for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
TaskState jobState = workflowCtx.getJobState(ancestor);
@@ -131,13 +143,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
++notStartedCount;
} else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
++inCompleteCount;
+ } else if (jobState == TaskState.FAILED) {
+ ++failedCount;
}
}
- if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
- LOG.debug(String
- .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.",
- job, notStartedCount, inCompleteCount));
+ if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()
+ || failedCount > 0) {
+ LOG.debug(String.format(
+ "Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d, failedParent(s)=%d.",
+ job, notStartedCount, inCompleteCount, failedCount));
return false;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 912f501..db5426c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -77,17 +77,15 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
long currentTime = System.currentTimeMillis();
- // Check if workflow is completed and mark it if it is completed.
- if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
- if (isWorkflowComplete(workflowCtx, workflowCfg)) {
- workflowCtx.setWorkflowState(TaskState.COMPLETED);
- workflowCtx.setFinishTime(currentTime);
- TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
- }
+ // Check if workflow has been finished and mark it if it is.
+ if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
+ && isWorkflowFinished(workflowCtx, workflowCfg)) {
+ workflowCtx.setFinishTime(currentTime);
+ TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
}
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
- LOG.info("Workflow " + workflow + " is completed.");
+ LOG.info("Workflow " + workflow + " is finished.");
long expiryTime = workflowCfg.getExpiry();
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
@@ -162,10 +160,19 @@ public class WorkflowRebalancer extends TaskRebalancer {
// Set up job resource based on partitions from target resource
int numIndependentTasks = jobConfig.getTaskConfigMap().size();
- int numPartitions = (numIndependentTasks > 0) ?
- numIndependentTasks :
- admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource())
- .getPartitionSet().size();
+
+ int numPartitions = numIndependentTasks;
+ if (numPartitions == 0) {
+ IdealState targetIs =
+ admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
+ if (targetIs == null) {
+ LOG.warn("Target resource does not exist for job " + jobResource);
+ // do not need to fail here, the job will be marked as failure immediately when job starts running.
+ } else {
+ numPartitions = targetIs.getPartitionSet().size();
+ }
+ }
+
admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
TaskConstants.STATE_MODEL_NAME);
http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
new file mode 100644
index 0000000..74a8610
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -0,0 +1,214 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestRunJobsWithMissingTarget.class);
+ private static final int num_nodes = 5;
+ private static final int num_dbs = 5;
+ private static final int START_PORT = 12918;
+ private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+ private static final String TIMEOUT_CONFIG = "Timeout";
+ private static final int NUM_PARTITIONS = 20;
+ private static final int NUM_REPLICAS = 3;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
+ private ClusterControllerManager _controller;
+ private ClusterSetup _setupTool;
+
+ private List<String> _test_dbs = new ArrayList<String>();
+
+ private HelixManager _manager;
+ private TaskDriver _driver;
+
+ @BeforeClass public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < num_nodes; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ // Set up target dbs
+ for (int i = 0; i < num_dbs; i++) {
+ String db = "TestDB" + i;
+ _setupTool
+ .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL,
+ IdealState.RebalanceMode.FULL_AUTO.toString());
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+ _test_dbs.add(db);
+ }
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+ @Override public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ });
+
+ // start dummy participants
+ for (int i = 0; i < num_nodes; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task",
+ new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+ _participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // create cluster manager
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+
+ _driver = new TaskDriver(_manager);
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _controller.syncStop();
+ for (int i = 0; i < num_nodes; i++) {
+ _participants[i].syncStop();
+ }
+ _manager.disconnect();
+ }
+
+ private JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart) {
+ Map<String, String> cfgMap = new HashMap<String, String>();
+ cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+ Calendar cal = Calendar.getInstance();
+ cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
+ cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
+ cal.set(Calendar.MILLISECOND, 0);
+ cfgMap.put(WorkflowConfig.START_TIME,
+ WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+ return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+ }
+
+ private JobQueue.Builder buildJobQueue(String jobQueueName) {
+ return buildJobQueue(jobQueueName, 0);
+ }
+
+ @Test public void testJobFailsWithMissingTarget() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = buildJobQueue(queueName);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+ String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
+ TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
+ TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+ }
+
+ @Test public void testJobFailsWithMissingTargetInRunning() throws Exception {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuilder = buildJobQueue(queueName);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i < num_dbs; i++) {
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+ .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+ String jobName = "job" + _test_dbs.get(i);
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.start(queueBuilder.build());
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0));
+
+ String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
+ TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
+ TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+ }
+}
[2/4] helix git commit: [HELIX-623] Do not expose internal
configuration field name. Client should use JobConfig.Builder to create
jobConfig.
Posted by lx...@apache.org.
[HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79c490fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79c490fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79c490fa
Branch: refs/heads/helix-0.6.x
Commit: 79c490fab080494b68a1f52845c1e708b8881439
Parents: 2409601
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 15:59:37 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700
----------------------------------------------------------------------
.../org/apache/helix/model/ResourceConfig.java | 61 +++++
.../java/org/apache/helix/task/JobConfig.java | 258 +++++++++++--------
.../java/org/apache/helix/task/TaskDriver.java | 1 +
.../java/org/apache/helix/task/TaskUtil.java | 21 +-
.../java/org/apache/helix/task/Workflow.java | 37 +--
.../task/TestIndependentTaskRebalancer.java | 93 +++----
.../integration/task/TestRecurringJobQueue.java | 4 +-
.../integration/task/TestTaskRebalancer.java | 39 +--
.../task/TestTaskRebalancerRetryLimit.java | 18 +-
.../task/TestTaskRebalancerStopResume.java | 10 +-
.../integration/task/WorkflowGenerator.java | 56 ++--
11 files changed, 354 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 98433f5..d58126d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -23,6 +23,9 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
+import java.util.Collections;
+import java.util.Map;
+
/**
* Resource configurations
*/
@@ -73,6 +76,64 @@ public class ResourceConfig extends HelixProperty {
.setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
}
+ /**
+ * Put a set of simple configs.
+ *
+ * @param configsMap
+ */
+ public void putSimpleConfigs(Map<String, String> configsMap) {
+ getRecord().getSimpleFields().putAll(configsMap);
+ }
+
+ /**
+ * Get all simple configurations.
+ *
+ * @return all simple configurations.
+ */
+ public Map<String, String> getSimpleConfigs() {
+ return Collections.unmodifiableMap(getRecord().getSimpleFields());
+ }
+
+ /**
+ * Put a single simple config value.
+ *
+ * @param configKey
+ * @param configVal
+ */
+ public void putSimpleConfig(String configKey, String configVal) {
+ getRecord().getSimpleFields().put(configKey, configVal);
+ }
+
+ /**
+ * Get a single simple config value.
+ *
+ * @param configKey
+ * @return configuration value, or NULL if not exist.
+ */
+ public String getSimpleConfig(String configKey) {
+ return getRecord().getSimpleFields().get(configKey);
+ }
+
+ /**
+ * Put a single map config.
+ *
+ * @param configKey
+ * @param configValMap
+ */
+ public void putMapConfig(String configKey, Map<String, String> configValMap) {
+ getRecord().setMapField(configKey, configValMap);
+ }
+
+ /**
+ * Get a single map config.
+ *
+ * @param configKey
+ * @return configuration value map, or NULL if not exist.
+ */
+ public Map<String, String> getMapConfig(String configKey) {
+ return getRecord().getMapField(configKey);
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceConfig) {
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index c7c2f38..37a2f35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -36,49 +36,87 @@ import com.google.common.collect.Maps;
* Provides a typed interface to job configurations.
*/
public class JobConfig {
- // // Property names ////
-
- /** The name of the workflow to which the job belongs. */
- public static final String WORKFLOW_ID = "WorkflowID";
- /** The assignment strategy of this job */
- public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
- /** The name of the target resource. */
- public static final String TARGET_RESOURCE = "TargetResource";
- /**
- * The set of the target partition states. The value must be a comma-separated list of partition
- * states.
- */
- public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+
/**
- * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+ * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder.
*/
- public static final String TARGET_PARTITIONS = "TargetPartitions";
- /** The command that is to be run by participants in the case of identical tasks. */
- public static final String COMMAND = "Command";
- /** The command configuration to be used by the tasks. */
- public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig";
- /** The timeout for a task. */
- public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
- /** The maximum number of times the task rebalancer may attempt to execute a task. */
- public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
- /** The maximum number of times Helix will intentionally move a failing task */
- public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask";
- /** The number of concurrent tasks that are allowed to run on an instance. */
- public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
- /** The number of tasks within the job that are allowed to fail. */
- public static final String FAILURE_THRESHOLD = "FailureThreshold";
- /** The amount of time in ms to wait before retrying a task */
- public static final String TASK_RETRY_DELAY = "TaskRetryDelay";
-
- /** The individual task configurations, if any **/
- public static final String TASK_CONFIGS = "TaskConfigs";
-
- /** Disable external view (not showing) for this job resource */
- public static final String DISABLE_EXTERNALVIEW = "DisableExternalView";
-
-
- // // Default property values ////
+ protected enum JobConfigProperty {
+ /**
+ * The name of the workflow to which the job belongs.
+ */
+ WORKFLOW_ID("WorkflowID"),
+ /**
+ * The assignment strategy of this job
+ */
+ ASSIGNMENT_STRATEGY("AssignmentStrategy"),
+ /**
+ * The name of the target resource.
+ */
+ TARGET_RESOURCE("TargetResource"),
+ /**
+ * The set of the target partition states. The value must be a comma-separated list of partition
+ * states.
+ */
+ TARGET_PARTITION_STATES("TargetPartitionStates"),
+ /**
+ * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+ */
+ TARGET_PARTITIONS("TargetPartitions"),
+ /**
+ * The command that is to be run by participants in the case of identical tasks.
+ */
+ COMMAND("Command"),
+ /**
+ * The command configuration to be used by the tasks.
+ */
+ JOB_COMMAND_CONFIG_MAP("JobCommandConfig"),
+ /**
+ * The timeout for a task.
+ */
+ TIMEOUT_PER_TASK("TimeoutPerPartition"),
+ /**
+ * The maximum number of times the task rebalancer may attempt to execute a task.
+ */
+ MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"),
+ /**
+ * The maximum number of times Helix will intentionally move a failing task
+ */
+ MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"),
+ /**
+ * The number of concurrent tasks that are allowed to run on an instance.
+ */
+ NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"),
+ /**
+ * The number of tasks within the job that are allowed to fail.
+ */
+ FAILURE_THRESHOLD("FailureThreshold"),
+ /**
+ * The amount of time in ms to wait before retrying a task
+ */
+ TASK_RETRY_DELAY("TaskRetryDelay"),
+
+ /**
+ * The individual task configurations, if any *
+ */
+ TASK_CONFIGS("TaskConfigs"),
+
+ /**
+ * Disable external view (not showing) for this job resource
+ */
+ DISABLE_EXTERNALVIEW("DisableExternalView");
+
+ private final String _value;
+
+ private JobConfigProperty(String val) {
+ _value = val;
+ }
+
+ public String value() {
+ return _value;
+ }
+ }
+ //Default property values
public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -106,8 +144,7 @@ public class JobConfig {
Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
- boolean disableExternalView,
- Map<String, TaskConfig> taskConfigMap) {
+ boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -190,34 +227,39 @@ public class JobConfig {
public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+ cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow);
if (_command != null) {
- cfgMap.put(JobConfig.COMMAND, _command);
+ cfgMap.put(JobConfigProperty.COMMAND.value(), _command);
}
if (_jobCommandConfigMap != null) {
String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
if (serializedConfig != null) {
- cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig);
+ cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig);
}
}
if (_targetResource != null) {
- cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+ cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource);
}
if (_targetPartitionStates != null) {
- cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+ cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(),
+ Joiner.on(",").join(_targetPartitionStates));
}
if (_targetPartitions != null) {
- cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+ cfgMap
+ .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions));
}
if (_retryDelay > 0) {
- cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay);
+ cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay);
}
- cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
- cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
- cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
- cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
- cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
- cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
+ cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask);
+ cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask);
+ cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+ "" + _maxForcedReassignmentsPerTask);
+ cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold);
+ cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(),
+ Boolean.toString(_disableExternalView));
+ cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+ "" + _numConcurrentTasksPerInstance);
return cfgMap;
}
@@ -251,54 +293,58 @@ public class JobConfig {
/**
* Convenience method to build a {@link JobConfig} from a {@code Map<String, String>}.
+ *
* @param cfg A map of property names to their string representations.
* @return A {@link Builder}.
*/
public static Builder fromMap(Map<String, String> cfg) {
Builder b = new Builder();
- if (cfg.containsKey(WORKFLOW_ID)) {
- b.setWorkflow(cfg.get(WORKFLOW_ID));
+ if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) {
+ b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value()));
}
- if (cfg.containsKey(TARGET_RESOURCE)) {
- b.setTargetResource(cfg.get(TARGET_RESOURCE));
+ if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) {
+ b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value()));
}
- if (cfg.containsKey(TARGET_PARTITIONS)) {
- b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+ if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) {
+ b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value())));
}
- if (cfg.containsKey(TARGET_PARTITION_STATES)) {
- b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
- TARGET_PARTITION_STATES).split(","))));
+ if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) {
+ b.setTargetPartitionStates(new HashSet<String>(
+ Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(","))));
}
- if (cfg.containsKey(COMMAND)) {
- b.setCommand(cfg.get(COMMAND));
+ if (cfg.containsKey(JobConfigProperty.COMMAND.value())) {
+ b.setCommand(cfg.get(JobConfigProperty.COMMAND.value()));
}
- if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) {
- Map<String, String> commandConfigMap =
- TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP));
+ if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) {
+ Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap(
+ cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value()));
b.setJobCommandConfigMap(commandConfigMap);
}
- if (cfg.containsKey(TIMEOUT_PER_TASK)) {
- b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+ if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) {
+ b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value())));
}
- if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
- b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
- .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+ if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) {
+ b.setNumConcurrentTasksPerInstance(
+ Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())));
}
- if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
- b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+ if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) {
+ b.setMaxAttemptsPerTask(
+ Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())));
}
- if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) {
- b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg
- .get(MAX_FORCED_REASSIGNMENTS_PER_TASK)));
+ if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) {
+ b.setMaxForcedReassignmentsPerTask(
+ Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())));
}
- if (cfg.containsKey(FAILURE_THRESHOLD)) {
- b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+ if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) {
+ b.setFailureThreshold(
+ Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value())));
}
- if (cfg.containsKey(TASK_RETRY_DELAY)) {
- b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
+ if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) {
+ b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value())));
}
- if (cfg.containsKey(DISABLE_EXTERNALVIEW)) {
- b.setDisableExternalView(Boolean.valueOf(cfg.get(DISABLE_EXTERNALVIEW)));
+ if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) {
+ b.setDisableExternalView(
+ Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
}
return b;
}
@@ -384,38 +430,46 @@ public class JobConfig {
private void validate() {
if (_taskConfigMap.isEmpty() && _targetResource == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE));
}
- if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
- && _targetPartitionStates.isEmpty()) {
- throw new IllegalArgumentException(String.format("%s cannot be an empty set",
- TARGET_PARTITION_STATES));
+ if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
+ .isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES));
}
if (_taskConfigMap.isEmpty() && _command == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.COMMAND));
}
if (_timeoutPerTask < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- TIMEOUT_PER_TASK, _timeoutPerTask));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK,
+ _timeoutPerTask));
}
if (_numConcurrentTasksPerInstance < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ _numConcurrentTasksPerInstance));
}
if (_maxAttemptsPerTask < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK,
+ _maxAttemptsPerTask));
}
if (_maxForcedReassignmentsPerTask < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ _maxForcedReassignmentsPerTask));
}
if (_failureThreshold < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- FAILURE_THRESHOLD, _failureThreshold));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD,
+ _failureThreshold));
}
if (_workflow == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 9b64aec..c4986ee 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -55,6 +55,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d804fab..524b889 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -402,10 +402,10 @@ public class TaskUtil {
Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
- Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+ Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
// Set the workflow expiry
- builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+ workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
// Set the schedule, if applicable
ScheduleConfig scheduleConfig;
@@ -415,7 +415,7 @@ public class TaskUtil {
scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
}
if (scheduleConfig != null) {
- builder.setScheduleConfig(scheduleConfig);
+ workflowBuilder.setScheduleConfig(scheduleConfig);
}
// Add each job back as long as the original exists
@@ -426,29 +426,30 @@ public class TaskUtil {
String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
- jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
- for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
- builder.addConfig(job, e.getKey(), e.getValue());
- }
+
+ JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
+
+ jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
taskConfigs.add(taskConfig);
}
- builder.addTaskConfigs(job, taskConfigs);
+ jobCfgBuilder.addTaskConfigs(taskConfigs);
+ workflowBuilder.addJobConfig(job, jobCfgBuilder);
// Add dag dependencies
Set<String> children = parentsToChildren.get(namespacedJob);
if (children != null) {
for (String namespacedChild : children) {
String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
- builder.addParentChildDependency(job, child);
+ workflowBuilder.addParentChildDependency(job, child);
}
}
}
}
- return builder.build();
+ return workflowBuilder.build();
}
private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 8ea2691..3a050c2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -128,7 +128,9 @@ public class Workflow {
return parse(new StringReader(yaml));
}
- /** Helper function to parse workflow from a generic {@link Reader} */
+ /**
+ * Helper function to parse workflow from a generic {@link Reader}
+ */
private static Workflow parse(Reader reader) throws Exception {
Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
@@ -146,29 +148,32 @@ public class Workflow {
}
}
- builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
- builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name);
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command);
if (job.jobConfigMap != null) {
builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
}
- builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(),
+ job.targetResource);
if (job.targetPartitionStates != null) {
- builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(),
Joiner.on(",").join(job.targetPartitionStates));
}
if (job.targetPartitions != null) {
- builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(),
Joiner.on(",").join(job.targetPartitions));
}
- builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(),
String.valueOf(job.maxAttemptsPerTask));
- builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ builder.addConfig(job.name,
+ JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
String.valueOf(job.maxForcedReassignmentsPerTask));
- builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ builder.addConfig(job.name,
+ JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
String.valueOf(job.numConcurrentTasksPerInstance));
- builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(),
String.valueOf(job.timeoutPerPartition));
- builder.addConfig(job.name, JobConfig.FAILURE_THRESHOLD,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(),
String.valueOf(job.failureThreshold));
if (job.tasks != null) {
List<TaskConfig> taskConfigs = Lists.newArrayList();
@@ -242,7 +247,7 @@ public class Workflow {
_expiry = -1;
}
- public Builder addConfig(String job, String key, String val) {
+ private Builder addConfig(String job, String key, String val) {
job = namespacify(job);
_dag.addNode(job);
if (!_jobConfigs.containsKey(job)) {
@@ -252,8 +257,8 @@ public class Workflow {
return this;
}
- public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
- return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP,
+ private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
+ return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(),
TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
}
@@ -268,7 +273,7 @@ public class Workflow {
return this;
}
- public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+ private Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
job = namespacify(job);
_dag.addNode(job);
if (!_taskConfigs.containsKey(job)) {
@@ -322,7 +327,7 @@ public class Workflow {
protected WorkflowConfig.Builder buildWorkflowConfig() {
for (String task : _jobConfigs.keySet()) {
// addConfig(task, TaskConfig.WORKFLOW_ID, _name);
- _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
+ _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name);
}
WorkflowConfig.Builder builder;
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index a00a736..40c2485 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -140,8 +140,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_runCounts.clear();
}
- @Test
- public void testDifferentTasks() throws Exception {
+ @Test public void testDifferentTasks() throws Exception {
// Create a job with two different tasks
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -150,11 +149,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -166,8 +166,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test
- public void testThresholdFailure() throws Exception {
+ @Test public void testThresholdFailure() throws Exception {
// Create a job with two different tasks
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -177,12 +176,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1)
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -194,8 +193,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test
- public void testOptionalTaskFailure() throws Exception {
+ @Test public void testOptionalTaskFailure() throws Exception {
// Create a job with two different tasks
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -205,11 +203,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -221,24 +222,23 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test
- public void testReassignment() throws Exception {
+ @Test public void testReassignment() throws Exception {
final int NUM_INSTANCES = 2;
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
- Map<String, String> taskConfigMap =
- Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_'
- + START_PORT));
+ Map<String, String> taskConfigMap = Maps.newHashMap(
+ ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT));
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
taskConfigs.add(taskConfig1);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, ""
- + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+ .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -251,8 +251,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
// Ensure that this was tried on two different instances, the first of which exhausted the
// attempts number, and the other passes on the first try
Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
- Assert.assertTrue(_runCounts.values().contains(
- JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
+ Assert.assertTrue(
+ _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
Assert.assertTrue(_runCounts.values().contains(1));
}
@@ -264,11 +264,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Map<String, String> taskConfigMap = Maps.newHashMap();
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
taskConfigs.add(taskConfig1);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+ .addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
_driver.start(workflowBuilder.build());
@@ -295,11 +298,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Map<String, String> taskConfigMap = Maps.newHashMap();
TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
taskConfigs.add(taskConfig1);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay));
- Map<String, String> jobConfigMap = Maps.newHashMap();
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+ .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
SingleFailTask.hasFailed = false;
_driver.start(workflowBuilder.build());
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 79adcd5..da13ada 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -215,12 +215,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
- JobConfig.Builder job =
+ JobConfig.Builder jobConfig =
new JobConfig.Builder().setCommand("Reindex")
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
- queueBuild.enqueueJob(jobName, job);
+ queueBuild.enqueueJob(jobName, jobConfig);
currentJobNames.add(jobName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f402b82..3352d1c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -163,9 +163,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
String jobName = "Expiry";
long expiry = 1000;
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
- Workflow flow =
- WorkflowGenerator
- .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig);
+
+ Workflow flow = WorkflowGenerator
+ .generateSingleJobWorkflowBuilder(jobName, jobBuilder)
.setExpiry(expiry).build();
_driver.start(flow);
@@ -204,9 +206,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
final String jobResource = "basic" + jobCompletionTime;
Map<String, String> commandConfig =
ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- commandConfig).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// Wait for job completion
@@ -220,18 +225,20 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
}
}
- @Test
- public void partitionSet() throws Exception {
+ @Test public void partitionSet() throws Exception {
final String jobResource = "partitionSet";
ImmutableList<String> targetPartitions =
ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
// construct and submit our basic workflow
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig).setMaxAttemptsPerTask(1)
+ .setTargetPartitions(targetPartitions);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
- JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// wait for job completeness/timeout
@@ -268,13 +275,15 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
}
}
- @Test
- public void timeouts() throws Exception {
+ @Test public void timeouts() throws Exception {
final String jobResource = "timeouts";
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+ .setMaxAttemptsPerTask(2).setTimeoutPerTask(100);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
- String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// Wait until the job reports failure.
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index b678d7e..efe90b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -125,18 +125,15 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
_manager.disconnect();
}
- @Test
- public void test() throws Exception {
+ @Test public void test() throws Exception {
String jobResource = TestHelper.getTestMethodName();
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+ .setMaxAttemptsPerTask(2).setCommand("ErrorTask").setFailureThreshold(Integer.MAX_VALUE);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
- String.valueOf(2)).build();
- Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs();
- for (Map<String, String> jobConfig : jobConfigs.values()) {
- jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
- jobConfig.put(JobConfig.COMMAND, "ErrorTask");
- }
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
@@ -151,7 +148,6 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2);
}
}
-
}
private static class ErrorTask implements Task {
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 8a44672..7437b72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -162,12 +162,14 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
_manager.disconnect();
}
- @Test
- public void stopAndResume() throws Exception {
+ @Test public void stopAndResume() throws Exception {
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+ JobConfig.Builder jobBuilder =
+ JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig);
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
- commandConfig).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(JOB_RESOURCE, jobBuilder).build();
LOG.info("Starting flow " + flow.getName());
_driver.start(flow);
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index a414f5c..23c35af 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -56,58 +56,34 @@ public class WorkflowGenerator {
DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
}
- public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
- String jobName, Map<String, String> commandConfig, String... cfgs) {
- if (cfgs.length % 2 != 0) {
- throw new IllegalArgumentException(
- "Additional configs should have even number of keys and values");
- }
- Workflow.Builder bldr = generateSingleJobWorkflowBuilder(jobName, commandConfig, DEFAULT_JOB_CONFIG);
- for (int i = 0; i < cfgs.length; i += 2) {
- bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
- }
-
- return bldr;
+ private static final JobConfig.Builder DEFAULT_JOB_BUILDER;
+ static {
+ JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+ builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+ DEFAULT_JOB_BUILDER = builder;
}
public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
- return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+ return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
}
public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
- Map<String, String> commandConfig, Map<String, String> config) {
- Workflow.Builder builder = new Workflow.Builder(jobName);
- for (String key : config.keySet()) {
- builder.addConfig(jobName, key, config.get(key));
- }
- if (commandConfig != null) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- String serializedMap = mapper.writeValueAsString(commandConfig);
- builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
- } catch (IOException e) {
- LOG.error("Error serializing " + commandConfig, e);
- }
- }
- return builder;
+ JobConfig.Builder jobBuilder) {
+ return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder);
}
public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) {
Workflow.Builder builder = new Workflow.Builder(workflowName);
builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
- for (String key : DEFAULT_JOB_CONFIG.keySet()) {
- builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
- builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
- }
- ObjectMapper mapper = new ObjectMapper();
- try {
- String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
- builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
- builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
- } catch (IOException e) {
- LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
- }
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+ builder.addJobConfig(JOB_NAME_1, jobBuilder);
+ builder.addJobConfig(JOB_NAME_2, jobBuilder);
+
return builder;
}
}
[3/4] helix git commit: Clean up unit tests for task framework.
Posted by lx...@apache.org.
Clean up unit tests for task framework.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d381a3a1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d381a3a1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d381a3a1
Branch: refs/heads/helix-0.6.x
Commit: d381a3a1cc69d129388896907b9cc696811650c7
Parents: 79c490f
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 16:33:08 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700
----------------------------------------------------------------------
.../webapp/resources/TestJobQueuesResource.java | 7 +-
.../helix/integration/task/DummyTask.java | 72 ------
.../apache/helix/integration/task/MockTask.java | 73 ++++++
.../helix/integration/task/TaskTestUtil.java | 253 +++++++++++++++++++
.../task/TestIndependentTaskRebalancer.java | 23 +-
.../integration/task/TestRecurringJobQueue.java | 157 +++---------
.../integration/task/TestTaskRebalancer.java | 72 +-----
.../task/TestTaskRebalancerFailover.java | 12 +-
.../task/TestTaskRebalancerParallel.java | 60 +----
.../task/TestTaskRebalancerRetryLimit.java | 2 +-
.../task/TestTaskRebalancerStopResume.java | 154 ++++-------
.../apache/helix/integration/task/TestUtil.java | 207 ---------------
.../integration/task/WorkflowGenerator.java | 4 +-
13 files changed, 451 insertions(+), 645 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index 9c2306a..5d8a93b 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -29,7 +29,7 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.DummyTask;
+import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.Task;
@@ -42,9 +42,6 @@ import org.apache.helix.task.beans.WorkflowBean;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.webapp.AdminTestBase;
import org.apache.helix.webapp.AdminTestHelper;
-import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
-import org.apache.helix.webapp.resources.JsonParameters;
-import org.apache.helix.webapp.resources.ResourceUtil;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -81,7 +78,7 @@ public class TestJobQueuesResource extends AdminTestBase {
taskFactoryReg.put("DummyTask", new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new DummyTask(context);
+ return new MockTask(context);
}
});
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
deleted file mode 100644
index b6054d0..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-
-public class DummyTask implements Task {
- private static final String TIMEOUT_CONFIG = "Timeout";
- private final long _delay;
- private volatile boolean _canceled;
-
- public DummyTask(TaskCallbackContext context) {
- JobConfig jobCfg = context.getJobConfig();
- Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
- if (cfg == null) {
- cfg = Collections.emptyMap();
- }
- _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
- }
-
- @Override
- public TaskResult run() {
- long expiry = System.currentTimeMillis() + _delay;
- long timeLeft;
- while (System.currentTimeMillis() < expiry) {
- if (_canceled) {
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
- : timeLeft));
- }
- sleep(50);
- }
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
- }
-
- @Override
- public void cancel() {
- _canceled = true;
- }
-
- private static void sleep(long d) {
- try {
- Thread.sleep(d);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
new file mode 100644
index 0000000..71fa12d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+public class MockTask implements Task {
+ public static final String TASK_COMMAND = "Reindex";
+ private static final String TIMEOUT_CONFIG = "Timeout";
+ private final long _delay;
+ private volatile boolean _canceled;
+
+ public MockTask(TaskCallbackContext context) {
+ JobConfig jobCfg = context.getJobConfig();
+ Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+ if (cfg == null) {
+ cfg = Collections.emptyMap();
+ }
+ _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+ }
+
+ @Override
+ public TaskResult run() {
+ long expiry = System.currentTimeMillis() + _delay;
+ long timeLeft;
+ while (System.currentTimeMillis() < expiry) {
+ if (_canceled) {
+ timeLeft = expiry - System.currentTimeMillis();
+ return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+ : timeLeft));
+ }
+ sleep(50);
+ }
+ timeLeft = expiry - System.currentTimeMillis();
+ return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ }
+
+ @Override
+ public void cancel() {
+ _canceled = true;
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
new file mode 100644
index 0000000..c5dd099
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -0,0 +1,253 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+
+/**
+ * Static test utility methods.
+ */
+public class TaskTestUtil {
+ private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
+
+ /**
+ * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
+ * reached.
+ * If the task has not reached target state by then, an error is thrown
+ * @param workflowResource Resource to poll for completeness
+ * @throws InterruptedException
+ */
+ public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+ TaskState state) throws InterruptedException {
+ // Wait for completion.
+ long st = System.currentTimeMillis();
+ WorkflowContext ctx;
+ do {
+ Thread.sleep(100);
+ ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+ } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
+ && System.currentTimeMillis() < st + _default_timeout);
+
+ Assert.assertNotNull(ctx);
+ Assert.assertEquals(ctx.getWorkflowState(), state);
+ }
+
+ /**
+ * poll for job until it is at either state in targetStates.
+ * @param manager
+ * @param workflowResource
+ * @param jobName
+ * @param targetStates
+ * @throws InterruptedException
+ */
+ public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
+ TaskState... targetStates) throws InterruptedException {
+ // Get workflow config
+ WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
+ Assert.assertNotNull(wfCfg);
+ WorkflowContext ctx;
+ if (wfCfg.isRecurring()) {
+ // if it's recurring, need to reconstruct workflow and job name
+ do {
+ Thread.sleep(100);
+ ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+ } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
+ Assert.assertNotNull(ctx);
+ Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
+ jobName = jobName.substring(workflowResource.length() + 1);
+ workflowResource = ctx.getLastScheduledSingleWorkflow();
+ jobName = String.format("%s_%s", workflowResource, jobName);
+ }
+
+ Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
+ // Wait for state
+ long st = System.currentTimeMillis();
+ do {
+ Thread.sleep(100);
+ ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+ }
+ while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
+ && System.currentTimeMillis() < st + _default_timeout);
+ Assert.assertNotNull(ctx);
+ Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
+ }
+
+ public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
+ final String jobName) throws Exception {
+ final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
+ boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
+ return ctx == null || ctx.getJobState(namespacedJobName) == null;
+ }
+ }, _default_timeout);
+ Assert.assertTrue(succeed);
+ }
+
+ public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
+ throws InterruptedException {
+ // Wait for completion.
+ long st = System.currentTimeMillis();
+ WorkflowContext ctx;
+ do {
+ Thread.sleep(100);
+ ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+ } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
+ Assert.assertNotNull(ctx);
+ return ctx;
+ }
+
+ // 1. Different jobs in a same work flow is in RUNNING at the same time
+ // 2. No two jobs in the same work flow is in RUNNING at the same instance
+ public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
+ throws InterruptedException {
+
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
+ Assert.assertNotNull(workflowConfig);
+
+ WorkflowContext workflowContext = null;
+ while (workflowContext == null) {
+ workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+ Thread.sleep(100);
+ }
+
+ int maxRunningCount = 0;
+ boolean finished = false;
+
+ while (!finished) {
+ finished = true;
+ int runningCount = 0;
+
+ workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+ for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+ TaskState jobState = workflowContext.getJobState(jobName);
+ if (jobState == TaskState.IN_PROGRESS) {
+ ++runningCount;
+ finished = false;
+ }
+ }
+
+ if (runningCount > maxRunningCount ) {
+ maxRunningCount = runningCount;
+ }
+
+ List<JobContext> jobContextList = new ArrayList<JobContext>();
+ for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+ JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
+ if (jobContext != null) {
+ jobContextList.add(TaskUtil.getJobContext(manager, jobName));
+ }
+ }
+
+ Set<String> instances = new HashSet<String>();
+ for (JobContext jobContext : jobContextList) {
+ for (int partition : jobContext.getPartitionSet()) {
+ String instance = jobContext.getAssignedParticipant(partition);
+ TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
+
+ if (instance == null) {
+ continue;
+ }
+ if (taskPartitionState != TaskPartitionState.INIT &&
+ taskPartitionState != TaskPartitionState.RUNNING) {
+ continue;
+ }
+ if (instances.contains(instance)) {
+ return false;
+ }
+
+ TaskPartitionState state = jobContext.getPartitionState(partition);
+ if (state != TaskPartitionState.COMPLETED) {
+ instances.add(instance);
+ }
+ }
+ }
+
+ Thread.sleep(100);
+ }
+
+ return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
+ }
+
+ public static Date getDateFromStartTime(String startTime)
+ {
+ int splitIndex = startTime.indexOf(':');
+ int hourOfDay = 0, minutes = 0;
+ try
+ {
+ hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
+ minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
+ }
+ catch (NumberFormatException e)
+ {
+
+ }
+ Calendar cal = Calendar.getInstance();
+ cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
+ cal.set(Calendar.MINUTE, minutes);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTime();
+ }
+
+ public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+ Map<String, String> cfgMap = new HashMap<String, String>();
+ cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+ cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
+ cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
+ Calendar cal = Calendar.getInstance();
+ cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
+ cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
+ cal.set(Calendar.MILLISECOND, 0);
+ cfgMap.put(WorkflowConfig.START_TIME,
+ WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+ //cfgMap.put(WorkflowConfig.START_TIME,
+ //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+ return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+ }
+
+ public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
+ return buildRecurrentJobQueue(jobQueueName, 0);
+ }
+
+ public static boolean pollForParticipantParallelState() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 40c2485..ba8367e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -32,7 +32,6 @@ import org.apache.helix.TestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -158,8 +157,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that each class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -185,8 +184,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that each class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -214,8 +213,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that each class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -242,8 +241,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that the class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -277,7 +276,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that the class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -309,7 +308,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure completion
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure a single retry happened
JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName);
@@ -317,7 +316,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay);
}
- private class TaskOne extends ReindexTask {
+ private class TaskOne extends MockTask {
private final boolean _shouldFail;
private final String _instanceName;
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index da13ada..4e21ef7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -20,9 +20,6 @@ package org.apache.helix.integration.task;
*/
import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -52,11 +49,9 @@ import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -108,10 +103,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("Reindex", new TaskFactory() {
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new ReindexTask(context);
+ return new MockTask(context);
}
});
@@ -162,46 +157,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_manager.disconnect();
}
- private Date getDateFromStartTime(String startTime)
- {
- int splitIndex = startTime.indexOf(':');
- int hourOfDay = 0, minutes = 0;
- try
- {
- hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
- minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
- }
- catch (NumberFormatException e)
- {
-
- }
- Calendar cal = Calendar.getInstance();
- cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
- cal.set(Calendar.MINUTE, minutes);
- cal.set(Calendar.SECOND, 0);
- cal.set(Calendar.MILLISECOND, 0);
- return cal.getTime();
- }
- private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
- Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
- cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
- cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
- Calendar cal = Calendar.getInstance();
- cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
- cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
- cal.set(Calendar.MILLISECOND, 0);
- cfgMap.put(WorkflowConfig.START_TIME,
- WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
- //cfgMap.put(WorkflowConfig.START_TIME,
- //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
- return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
- }
-
- private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
- return buildRecurrentJobQueue(jobQueueName, 0);
- }
@Test
public void deleteRecreateRecurrentQueue() throws Exception {
@@ -209,14 +165,14 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
+ JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder jobConfig =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -226,25 +182,25 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_driver.start(queueBuild.build());
- WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
// ensure job 1 is started before stop it
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
- TestUtil
+ TaskTestUtil
.pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
_driver.stop(queueName);
_driver.delete(queueName);
Thread.sleep(500);
- JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
currentJobNames.clear();
for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -255,17 +211,17 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_driver.createQueue(queueBuilder.build());
- wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
// ensure jobs are started and completed
scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
- TestUtil
+ TaskTestUtil
.pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1));
- TestUtil
+ TaskTestUtil
.pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
}
@@ -275,7 +231,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
@@ -285,7 +241,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setJobCommandConfigMap(commandConfig)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
@@ -296,20 +252,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
}
_driver.createQueue(queueBuilder.build());
- WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
// ensure job 1 is started before deleting it
String deletedJob1 = currentJobNames.get(0);
String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
- TaskState.COMPLETED);
+ TaskTestUtil
+ .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+ TaskState.COMPLETED);
// stop the queue
LOG.info("Pausing job-queue: " + scheduledQueue);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
// delete the in-progress job (job 1) and verify it being deleted
_driver.deleteJob(queueName, deletedJob1);
@@ -320,21 +277,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_driver.resume(queueName);
// ensure job 2 is started
- TestUtil.pollForJobState(_manager, scheduledQueue,
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue,
String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS,
TaskState.COMPLETED);
// stop the queue
LOG.info("Pausing job-queue: " + queueName);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager, scheduledQueue,
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue,
String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
// Ensure job 3 is not started before deleting it
String deletedJob2 = currentJobNames.get(2);
String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
- TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+ TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
// delete not-started job (job 3) and verify it being deleted
_driver.deleteJob(queueName, deletedJob2);
@@ -350,7 +307,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
long preJobFinish = 0;
for (int i = 0; i < currentJobNames.size(); i++) {
String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
long jobStart = jobContext.getStartTime();
@@ -366,7 +323,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
// create jobs
List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -378,7 +335,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(commandConfig)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
jobs.add(job);
@@ -395,12 +352,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
String currentLastJob = jobNames.get(JOB_COUNTS - 2);
- WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
// ensure all jobs are finished
String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
// enqueue the last job
LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -424,17 +381,17 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
- JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex")
+ JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet("SLAVE"));
- JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex")
+ JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
- JobConfig.Builder job3 = new JobConfig.Builder().setCommand("Reindex")
+ JobConfig.Builder job3 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
@@ -445,12 +402,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_driver.createQueue(queueBuilder.build());
- WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
// ensure all jobs are completed
String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3");
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
@@ -488,51 +445,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
- TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
- }
-
- public static class ReindexTask implements Task {
- private final long _delay;
- private volatile boolean _canceled;
-
- public ReindexTask(TaskCallbackContext context) {
- JobConfig jobCfg = context.getJobConfig();
- Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
- if (cfg == null) {
- cfg = Collections.emptyMap();
- }
- _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
- }
-
- @Override
- public TaskResult run() {
- long expiry = System.currentTimeMillis() + _delay;
- long timeLeft;
- while (System.currentTimeMillis() < expiry) {
- if (_canceled) {
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
- : timeLeft));
- }
- sleep(10L);
- }
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED,
- String.valueOf(timeLeft < 0 ? 0 : timeLeft));
- }
-
- @Override
- public void cancel() {
- _canceled = true;
- }
-
- private static void sleep(long d) {
- try {
- Thread.sleep(d);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
+ TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 3352d1c..787ebcc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -45,7 +44,6 @@ import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
@@ -98,10 +96,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("Reindex", new TaskFactory() {
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new ReindexTask(context);
+ return new MockTask(context);
}
});
@@ -171,7 +169,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
.setExpiry(expiry).build();
_driver.start(flow);
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
// Running workflow should have config and context viewable through accessor
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -185,7 +183,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
// Wait for job to finish and expire
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
Thread.sleep(expiry);
TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
Thread.sleep(expiry);
@@ -215,7 +213,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(flow);
// Wait for job completion
- TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
// Ensure all partitions are completed individually
JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
@@ -242,7 +240,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(flow);
// wait for job completeness/timeout
- TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
// see if resulting context completed successfully for our partition set
String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
@@ -267,11 +265,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
new TaskDriver(_manager).start(flow);
// Wait until the workflow completes
- TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
// Assert completion for all tasks within two minutes
for (String task : flow.getJobConfigs().keySet()) {
- TestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
}
}
@@ -287,7 +285,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(flow);
// Wait until the job reports failure.
- TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
// Check that all partitions timed out up to maxAttempts
JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
@@ -314,10 +312,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
Set<String> master = Sets.newHashSet("MASTER");
Set<String> slave = Sets.newHashSet("SLAVE");
JobConfig.Builder job1 =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
JobConfig.Builder job2 =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
_driver.enqueueJob(queueName, "masterJob", job1);
_driver.enqueueJob(queueName, "slaveJob", job2);
@@ -325,8 +323,8 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
// Ensure successful completion
String namespacedJob1 = queueName + "_masterJob";
String namespacedJob2 = queueName + "_slaveJob";
- TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
@@ -352,48 +350,4 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
}
-
- private static class ReindexTask implements Task {
- private final long _delay;
- private volatile boolean _canceled;
-
- public ReindexTask(TaskCallbackContext context) {
- JobConfig jobCfg = context.getJobConfig();
- Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
- if (cfg == null) {
- cfg = Collections.emptyMap();
- }
- _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
- }
-
- @Override
- public TaskResult run() {
- long expiry = System.currentTimeMillis() + _delay;
- long timeLeft;
- while (System.currentTimeMillis() < expiry) {
- if (_canceled) {
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
- : timeLeft));
- }
- sleep(50);
- }
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED,
- String.valueOf(timeLeft < 0 ? 0 : timeLeft));
- }
-
- @Override
- public void cancel() {
- _canceled = true;
- }
-
- private static void sleep(long d) {
- try {
- Thread.sleep(d);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index b8e1c09..6f1c48e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -82,10 +82,10 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("DummyTask", new TaskFactory() {
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new DummyTask(context);
+ return new MockTask(context);
}
});
@@ -143,7 +143,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
// Enqueue jobs
Set<String> master = Sets.newHashSet("MASTER");
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("DummyTask")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
String job1Name = "masterJob";
LOG.info("Enqueuing job: " + job1Name);
@@ -151,7 +151,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
// check all tasks completed on MASTER
String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -178,9 +178,9 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
LOG.info("Enqueuing job: " + job2Name);
_driver.enqueueJob(queueName, job2Name, job);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
_participants[0].syncStop();
- TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
// tasks previously assigned to localhost_12918 should be re-scheduled on new master
ctx = TaskUtil.getJobContext(_manager, namespacedJob2);
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 2ff8c56..5180a04 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -26,31 +26,21 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -58,9 +48,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
private static final int n = 5;
private static final int START_PORT = 12918;
@@ -105,10 +92,10 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
final long delay = (i + 1) * 1000L;
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("Reindex", new TaskFactory() {
+ taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
- return new ReindexTask(delay);
+ return new MockTask(context);
}
});
@@ -164,7 +151,7 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
for (String testDbName : testDbNames) {
jobConfigBuilders.add(
- new JobConfig.Builder().setCommand("Reindex").setTargetResource(testDbName)
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName)
.setTargetPartitionStates(Collections.singleton("SLAVE")));
}
@@ -172,45 +159,6 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
_driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i));
}
- Assert.assertTrue(TestUtil.pollForWorkflowParallelState(_manager, queueName));
- }
-
- public static class ReindexTask implements Task {
- private final long _delay;
- private volatile boolean _canceled;
-
- public ReindexTask(long delay) {
- _delay = delay;
- }
-
- @Override
- public TaskResult run() {
- long expiry = System.currentTimeMillis() + _delay;
- long timeLeft;
- while (System.currentTimeMillis() < expiry) {
- if (_canceled) {
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
- : timeLeft));
- }
- sleep(50);
- }
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED,
- String.valueOf(timeLeft < 0 ? 0 : timeLeft));
- }
-
- @Override
- public void cancel() {
- _canceled = true;
- }
-
- private static void sleep(long d) {
- try {
- Thread.sleep(d);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
+ Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_manager, queueName));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index efe90b0..d25ffc5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -138,7 +138,7 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
_driver.start(flow);
// Wait until the job completes.
- TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
for (int i = 0; i < _p; i++) {
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 7437b72..b67fa90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,7 +21,6 @@ package org.apache.helix.integration.task;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -41,7 +40,6 @@ import org.apache.helix.TestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -53,7 +51,6 @@ import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
@@ -62,7 +59,6 @@ import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.util.PathUtils;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -108,12 +104,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("Reindex", new TaskFactory() {
- @Override
- public Task createNewTask(TaskCallbackContext context) {
- return new ReindexTask(context);
- }
- });
+ taskFactoryReg
+ .put(MockTask.TASK_COMMAND, new TaskFactory() {
+ @Override public Task createNewTask(TaskCallbackContext context) {
+ return new MockTask(context);
+ }
+ });
// start dummy participants
for (int i = 0; i < n; i++) {
@@ -173,15 +169,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
LOG.info("Starting flow " + flow.getName());
_driver.start(flow);
- TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
LOG.info("Pausing job");
_driver.stop(JOB_RESOURCE);
- TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
LOG.info("Resuming job");
_driver.resume(JOB_RESOURCE);
- TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
}
@Test
@@ -191,15 +187,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
LOG.info("Starting flow " + workflow);
_driver.start(flow);
- TestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
LOG.info("Pausing workflow");
_driver.stop(workflow);
- TestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
LOG.info("Resuming workflow");
_driver.resume(workflow);
- TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
+ TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
}
@Test
@@ -214,7 +210,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
// Enqueue jobs
Set<String> master = Sets.newHashSet("MASTER");
JobConfig.Builder job1 =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
String job1Name = "masterJob";
LOG.info("Enqueuing job: " + job1Name);
@@ -222,32 +218,32 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
Set<String> slave = Sets.newHashSet("SLAVE");
JobConfig.Builder job2 =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
String job2Name = "slaveJob";
LOG.info("Enqueuing job: " + job2Name);
_driver.enqueueJob(queueName, job2Name, job2);
String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
// stop job1
LOG.info("Pausing job-queue: " + queueName);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
// Ensure job2 is not started
TimeUnit.MILLISECONDS.sleep(200);
String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
- TestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
+ TaskTestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
LOG.info("Resuming job-queue: " + queueName);
_driver.resume(queueName);
// Ensure successful completion
- TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
@@ -282,7 +278,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -294,13 +290,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
// ensure job 1 is started before deleting it
String deletedJob1 = currentJobNames.get(0);
String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
- TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+ TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
// stop the queue
LOG.info("Pausing job-queue: " + queueName);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
// delete the in-progress job (job 1) and verify it being deleted
_driver.deleteJob(queueName, deletedJob1);
@@ -310,22 +306,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
_driver.resume(queueName);
// ensure job 2 is started
- TestUtil.pollForJobState(_manager, queueName,
+ TaskTestUtil.pollForJobState(_manager, queueName,
String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
// stop the queue
LOG.info("Pausing job-queue: " + queueName);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager,
- queueName,
- String.format("%s_%s", queueName, currentJobNames.get(1)),
- TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+ TaskTestUtil.pollForJobState(_manager, queueName,
+ String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
// Ensure job 3 is not started before deleting it
String deletedJob2 = currentJobNames.get(2);
String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2);
- TestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
+ TaskTestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
// delete not-started job (job 3) and verify it being deleted
_driver.deleteJob(queueName, deletedJob2);
@@ -339,7 +333,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
// add job 3 back
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet("SLAVE"));
LOG.info("Enqueuing job: " + deletedJob2);
@@ -350,7 +344,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
long preJobFinish = 0;
for (int i = 0; i < currentJobNames.size(); i++) {
String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
- TestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
long jobStart = jobContext.getStartTime();
@@ -398,7 +392,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setJobCommandConfigMap(commandConfig)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
@@ -408,19 +402,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
currentJobNames.add(i, jobName);
}
- WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
// ensure job 1 is started before deleting it
String deletedJob1 = currentJobNames.get(0);
String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+ TaskTestUtil
+ .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
// stop the queue
LOG.info("Pausing job-queue: " + scheduledQueue);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
// delete the in-progress job (job 1) and verify it being deleted
_driver.deleteJob(queueName, deletedJob1);
@@ -431,20 +426,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
_driver.resume(queueName);
// ensure job 2 is started
- TestUtil.pollForJobState(_manager, scheduledQueue,
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue,
String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS);
// stop the queue
LOG.info("Pausing job-queue: " + queueName);
_driver.stop(queueName);
- TestUtil.pollForJobState(_manager, scheduledQueue,
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue,
String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
- TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+ TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
// Ensure job 3 is not started before deleting it
String deletedJob2 = currentJobNames.get(2);
String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
- TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+ TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
// delete not-started job (job 3) and verify it being deleted
_driver.deleteJob(queueName, deletedJob2);
@@ -460,7 +455,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
long preJobFinish = 0;
for (int i = 0; i < currentJobNames.size(); i++) {
String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
long jobStart = jobContext.getStartTime();
@@ -489,10 +484,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
for (int i = 0; i < JOB_COUNTS; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
- JobConfig.Builder job =
- new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
jobs.add(job);
jobNames.add(targetPartition.toLowerCase() + "Job" + i);
}
@@ -504,12 +498,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
}
String currentLastJob = jobNames.get(JOB_COUNTS - 2);
- WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
// ensure all jobs are finished
String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
- TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
// enqueue the last job
LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -537,7 +531,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
// Enqueue 2 jobs
Set<String> master = Sets.newHashSet("MASTER");
JobConfig.Builder job1 =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
String job1Name = "masterJob";
LOG.info("Enqueuing job1: " + job1Name);
@@ -545,17 +539,17 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
Set<String> slave = Sets.newHashSet("SLAVE");
JobConfig.Builder job2 =
- new JobConfig.Builder().setCommand("Reindex")
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
String job2Name = "slaveJob";
LOG.info("Enqueuing job2: " + job2Name);
_driver.enqueueJob(queueName, job2Name, job2);
String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
- TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+ TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
// Stop queue
_driver.stop(queueName);
@@ -605,7 +599,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
- TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+ TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
}
private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) {
@@ -615,48 +609,4 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName));
Assert.assertFalse(dag.getParentsToChildren().containsKey(namedSpacedJobName));
}
-
- public static class ReindexTask implements Task {
- private final long _delay;
- private volatile boolean _canceled;
-
- public ReindexTask(TaskCallbackContext context) {
- JobConfig jobCfg = context.getJobConfig();
- Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
- if (cfg == null) {
- cfg = Collections.emptyMap();
- }
- _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
- }
-
- @Override
- public TaskResult run() {
- long expiry = System.currentTimeMillis() + _delay;
- long timeLeft;
- while (System.currentTimeMillis() < expiry) {
- if (_canceled) {
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
- : timeLeft));
- }
- sleep(50);
- }
- timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED,
- String.valueOf(timeLeft < 0 ? 0 : timeLeft));
- }
-
- @Override
- public void cancel() {
- _canceled = true;
- }
-
- private static void sleep(long d) {
- try {
- Thread.sleep(d);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
deleted file mode 100644
index d40ac89..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.testng.Assert;
-
-/**
- * Static test utility methods.
- */
-public class TestUtil {
- private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
-
- /**
- * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
- * reached.
- * If the task has not reached target state by then, an error is thrown
- * @param workflowResource Resource to poll for completeness
- * @throws InterruptedException
- */
- public static void pollForWorkflowState(HelixManager manager, String workflowResource,
- TaskState state) throws InterruptedException {
- // Wait for completion.
- long st = System.currentTimeMillis();
- WorkflowContext ctx;
- do {
- Thread.sleep(100);
- ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
- } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
- && System.currentTimeMillis() < st + _default_timeout);
-
- Assert.assertNotNull(ctx);
- Assert.assertEquals(ctx.getWorkflowState(), state);
- }
-
- /**
- * poll for job until it is at either state in targetStates.
- * @param manager
- * @param workflowResource
- * @param jobName
- * @param targetStates
- * @throws InterruptedException
- */
- public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
- TaskState... targetStates) throws InterruptedException {
- // Get workflow config
- WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
- Assert.assertNotNull(wfCfg);
- WorkflowContext ctx;
- if (wfCfg.isRecurring()) {
- // if it's recurring, need to reconstruct workflow and job name
- do {
- Thread.sleep(100);
- ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
- } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
- Assert.assertNotNull(ctx);
- Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
- jobName = jobName.substring(workflowResource.length() + 1);
- workflowResource = ctx.getLastScheduledSingleWorkflow();
- jobName = String.format("%s_%s", workflowResource, jobName);
- }
-
- Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
- // Wait for state
- long st = System.currentTimeMillis();
- do {
- Thread.sleep(100);
- ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
- }
- while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
- && System.currentTimeMillis() < st + _default_timeout);
- Assert.assertNotNull(ctx);
- Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
- }
-
- public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
- final String jobName) throws Exception {
- final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
- boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
-
- @Override
- public boolean verify() throws Exception {
- WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
- return ctx == null || ctx.getJobState(namespacedJobName) == null;
- }
- }, _default_timeout);
- Assert.assertTrue(succeed);
- }
-
- public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
- throws InterruptedException {
- // Wait for completion.
- long st = System.currentTimeMillis();
- WorkflowContext ctx;
- do {
- Thread.sleep(100);
- ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
- } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
- Assert.assertNotNull(ctx);
- return ctx;
- }
-
- // 1. Different jobs in a same work flow is in RUNNING at the same time
- // 2. No two jobs in the same work flow is in RUNNING at the same instance
- public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
- throws InterruptedException {
-
- WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
- Assert.assertNotNull(workflowConfig);
-
- WorkflowContext workflowContext = null;
- while (workflowContext == null) {
- workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
- Thread.sleep(100);
- }
-
- int maxRunningCount = 0;
- boolean finished = false;
-
- while (!finished) {
- finished = true;
- int runningCount = 0;
-
- workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
- for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
- TaskState jobState = workflowContext.getJobState(jobName);
- if (jobState == TaskState.IN_PROGRESS) {
- ++runningCount;
- finished = false;
- }
- }
-
- if (runningCount > maxRunningCount ) {
- maxRunningCount = runningCount;
- }
-
- List<JobContext> jobContextList = new ArrayList<JobContext>();
- for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
- JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
- if (jobContext != null) {
- jobContextList.add(TaskUtil.getJobContext(manager, jobName));
- }
- }
-
- Set<String> instances = new HashSet<String>();
- for (JobContext jobContext : jobContextList) {
- for (int partition : jobContext.getPartitionSet()) {
- String instance = jobContext.getAssignedParticipant(partition);
- TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
-
- if (instance == null) {
- continue;
- }
- if (taskPartitionState != TaskPartitionState.INIT &&
- taskPartitionState != TaskPartitionState.RUNNING) {
- continue;
- }
- if (instances.contains(instance)) {
- return false;
- }
-
- TaskPartitionState state = jobContext.getPartitionState(partition);
- if (state != TaskPartitionState.COMPLETED) {
- instances.add(instance);
- }
- }
- }
-
- Thread.sleep(100);
- }
-
- return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
- }
-
- public static boolean pollForParticipantParallelState() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 23c35af..ce3a36a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.task;
* under the License.
*/
-import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
@@ -27,7 +26,6 @@ import java.util.TreeMap;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.Workflow;
import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
/**
* Convenience class for generating various test workflows
@@ -44,7 +42,7 @@ public class WorkflowGenerator {
Map<String, String> tmpMap = new TreeMap<String, String>();
tmpMap.put("TargetResource", DEFAULT_TGT_DB);
tmpMap.put("TargetPartitionStates", "MASTER");
- tmpMap.put("Command", "Reindex");
+ tmpMap.put("Command", MockTask.TASK_COMMAND);
tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
}
[4/4] helix git commit: [HELIX-622] Add new resource configuration
option to allow resource to disable emmiting monitoring bean.
Posted by lx...@apache.org.
[HELIX-622] Add new resource configuration option to allow resource to disable emmiting monitoring bean.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/24096019
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/24096019
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/24096019
Branch: refs/heads/helix-0.6.x
Commit: 24096019375e13c439ec6bfa83088ba7c25ffaf9
Parents: 6b6bb8f
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jan 8 17:14:00 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixConstants.java | 5 +-
.../main/java/org/apache/helix/PropertyKey.java | 6 +-
.../stages/BestPossibleStateCalcStage.java | 1 +
.../controller/stages/ClusterDataCache.java | 37 +++++--
.../stages/ExternalViewComputeStage.java | 20 ++--
.../org/apache/helix/model/ResourceConfig.java | 106 ++++++++++++++++++
.../mbeans/TestDisableResourceMbean.java | 109 +++++++++++++++++++
7 files changed, 264 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 6b6287c..5318fa9 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -44,8 +44,9 @@ public interface HelixConstants {
}
enum ClusterConfigType {
- HELIX_DISABLE_PIPELINE_TRIGGERS
+ HELIX_DISABLE_PIPELINE_TRIGGERS,
+ DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO
}
- static final String DEFAULT_STATE_MODEL_FACTORY = "DEFAULT";
+ String DEFAULT_STATE_MODEL_FACTORY = "DEFAULT";
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 663e831..33355f1 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -50,8 +50,10 @@ import org.apache.helix.model.LeaderHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.tools.YAMLClusterSetup;
import org.apache.log4j.Logger;
/**
@@ -212,7 +214,7 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey resourceConfigs() {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, ResourceConfig.class,
_clusterName, ConfigScopeProperty.RESOURCE.toString());
}
@@ -222,7 +224,7 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey resourceConfig(String resourceName) {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, ResourceConfig.class,
_clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 9a9767e..f12b6e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
import java.util.Map;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 7144077..fde4959 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -39,6 +39,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -61,6 +62,8 @@ public class ClusterDataCache {
Map<String, StateModelDefinition> _stateModelDefMap;
Map<String, InstanceConfig> _instanceConfigMap;
Map<String, InstanceConfig> _instanceConfigCacheMap;
+ Map<String, ResourceConfig> _resourceConfigMap;
+ Map<String, ResourceConfig> _resourceConfigCacheMap;
Map<String, ClusterConstraints> _constraintMap;
Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
Map<String, Map<String, Message>> _messageMap;
@@ -89,10 +92,15 @@ public class ClusterDataCache {
_idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
_liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
_instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ _resourceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
}
_idealStateMap = Maps.newHashMap(_idealStateCacheMap);
_liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
_instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
+ _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+
+ _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+ _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
if (LOG.isTraceEnabled()) {
for (LiveInstance instance : _liveInstanceMap.values()) {
@@ -100,9 +108,6 @@ public class ClusterDataCache {
}
}
- _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
- _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
-
Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
List<PropertyKey> newMessageKeys = Lists.newLinkedList();
long purgeSum = 0;
@@ -216,10 +221,9 @@ public class ClusterDataCache {
LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
if (LOG.isDebugEnabled()) {
- int numPaths =
- _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size()
- + _instanceConfigMap.size() + _constraintMap.size() + newMessageKeys.size()
- + currentStateKeys.size();
+ int numPaths = _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size()
+ + _instanceConfigMap.size() + _resourceConfigMap.size() + _constraintMap.size()
+ + newMessageKeys.size() + currentStateKeys.size();
LOG.debug("Paths read: " + numPaths);
}
@@ -341,6 +345,24 @@ public class ClusterDataCache {
return _instanceConfigMap;
}
+ /**
+ * Returns the instance config map
+ *
+ * @return
+ */
+ public Map<String, ResourceConfig> getResourceConfigMap() {
+ return _resourceConfigMap;
+ }
+
+ /**
+ * Returns the instance config map
+ *
+ * @return
+ */
+ public ResourceConfig getResourceConfig(String resource) {
+ return _resourceConfigMap.get(resource);
+ }
+
public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
for (InstanceConfig instanceConfig : instanceConfigs) {
@@ -424,6 +446,7 @@ public class ClusterDataCache {
sb.append("idealStateMap:" + _idealStateMap).append("\n");
sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+ sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n");
sb.append("messageMap:" + _messageMap).append("\n");
return sb.toString();
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 1455cd5..d83518d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,6 +44,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -104,20 +105,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
}
}
// Update cluster status monitor mbean
- ClusterStatusMonitor clusterStatusMonitor =
- (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ ClusterStatusMonitor clusterStatusMonitor = event.getAttribute("clusterStatusMonitor");
IdealState idealState = cache._idealStateMap.get(resourceName);
- if (idealState != null) {
- if (clusterStatusMonitor != null
- && !idealState.getStateModelDefRef().equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
+ if (idealState != null && (resourceConfig == null || !resourceConfig
+ .isMonitoringDisabled())) {
+ if (clusterStatusMonitor != null && !idealState.getStateModelDefRef()
+ .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
StateModelDefinition stateModelDef =
cache.getStateModelDef(idealState.getStateModelDefRef());
- clusterStatusMonitor.setResourceStatus(view,
- cache._idealStateMap.get(view.getResourceName()), stateModelDef);
+ clusterStatusMonitor
+ .setResourceStatus(view, cache._idealStateMap.get(view.getResourceName()),
+ stateModelDef);
}
} else {
- // Drop the metrics for the dropped resource
+ // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.
clusterStatusMonitor.unregisterResource(view.getResourceName());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
new file mode 100644
index 0000000..98433f5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -0,0 +1,106 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+/**
+ * Resource configurations
+ */
+public class ResourceConfig extends HelixProperty {
+ /**
+ * Configurable characteristics of an instance
+ */
+ public enum ResourceConfigProperty {
+ MONITORING_DISABLED, // Resource-level config, do not create Mbean and report any status for the resource.
+ }
+
+ private static final Logger _logger = Logger.getLogger(ResourceConfig.class.getName());
+
+ /**
+ * Instantiate for a specific instance
+ *
+ * @param resourceId the instance identifier
+ */
+ public ResourceConfig(String resourceId) {
+ super(resourceId);
+ }
+
+ /**
+ * Instantiate with a pre-populated record
+ *
+ * @param record a ZNRecord corresponding to an instance configuration
+ */
+ public ResourceConfig(ZNRecord record) {
+ super(record);
+ }
+
+ /**
+ * Get the value of DisableMonitoring set.
+ *
+ * @return the MonitoringDisabled is true or false
+ */
+ public Boolean isMonitoringDisabled() {
+ return _record.getBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), false);
+ }
+
+ /**
+ * Set whether to disable monitoring for this resource.
+ *
+ * @param monitoringDisabled whether to disable monitoring for this resource.
+ */
+ public void setMonitoringDisabled(boolean monitoringDisabled) {
+ _record
+ .setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ResourceConfig) {
+ ResourceConfig that = (ResourceConfig) obj;
+
+ if (this.getId().equals(that.getId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ /**
+ * Get the name of this resource
+ *
+ * @return the instance name
+ */
+ public String getResourceName() {
+ return _record.getId();
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/24096019/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
new file mode 100644
index 0000000..6d67fc7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -0,0 +1,109 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class TestDisableResourceMbean extends ZkUnitTestBase {
+ private MBeanServerConnection _mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ @Test public void testDisableResourceMonitoring() throws Exception {
+ final int NUM_PARTICIPANTS = 2;
+ String clusterName = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 3, // resources
+ 32, // partitions per resource
+ 4, // number of nodes
+ 1, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+ true); // do rebalance
+
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i] =
+ new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
+ participants[i].syncStart();
+ }
+
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ HelixConfigScope resourceScope =
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+ .forCluster(clusterName).forResource("TestDB1").build();
+ configAccessor
+ .set(resourceScope, ResourceConfig.ResourceConfigProperty.MONITORING_DISABLED.name(),
+ "true");
+
+ resourceScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+ .forCluster(clusterName).forResource("TestDB2").build();
+ configAccessor
+ .set(resourceScope, ResourceConfig.ResourceConfigProperty.MONITORING_DISABLED.name(),
+ "false");
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ Thread.sleep(300);
+
+ // Verify the bean was created for TestDB0, but not for TestDB1.
+ Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName)));
+ Assert.assertFalse(_mbeanServer.isRegistered(getMbeanName("TestDB1", clusterName)));
+ Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB2", clusterName)));
+
+ controller.syncStop();
+ for (MockParticipantManager participant : participants) {
+ participant.syncStop();
+ }
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ private ObjectName getMbeanName(String resourceName, String clusterName)
+ throws MalformedObjectNameException {
+ String clusterBeanName =
+ String.format("%s=%s", ClusterStatusMonitor.CLUSTER_DN_KEY, clusterName);
+ String resourceBeanName = String
+ .format("%s,%s=%s", clusterBeanName, ClusterStatusMonitor.RESOURCE_DN_KEY, resourceName);
+ return new ObjectName(
+ String.format("%s: %s", ClusterStatusMonitor.CLUSTER_STATUS_KEY, resourceBeanName));
+ }
+}