You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/11/30 18:19:04 UTC
[1/3] helix git commit: [HELIX-614] Fix the bug when job expiry time
is shorter than job schedule interval in recurring job queue.
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 456ddb0c4 -> 7bbb20be6
[HELIX-614] Fix the bug when job expiry time is shorter than job schedule interval in recurring job queue.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d129d3ab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d129d3ab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d129d3ab
Branch: refs/heads/helix-0.6.x
Commit: d129d3ab780adb1ff41fe5a0bfb3dafd7d5068a3
Parents: 456ddb0
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:38:31 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:38:31 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 1 +
.../org/apache/helix/task/TaskRebalancer.java | 35 +++++++++++++++++---
.../java/org/apache/helix/task/TaskUtil.java | 1 +
.../java/org/apache/helix/task/Workflow.java | 17 +++++++---
.../integration/task/TestRecurringJobQueue.java | 32 +++++++++++++++---
5 files changed, 74 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 30f76b7..c7c2f38 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -217,6 +217,7 @@ public class JobConfig {
cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
+ cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
return cfgMap;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2ff8b8c..5a86c3d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@@ -113,10 +114,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
final String resourceName = resource.getResourceName();
+ LOG.debug("Computer Best Partition for resource: " + resourceName);
// Fetch job configuration
JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
if (jobCfg == null) {
+ LOG.debug("Job configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
}
String workflowResource = jobCfg.getWorkflow();
@@ -124,6 +127,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// Fetch workflow configuration and context
WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
if (workflowCfg == null) {
+ LOG.debug("Workflow configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
}
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
@@ -132,6 +136,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (workflowCtx == null) {
workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
workflowCtx.setStartTime(System.currentTimeMillis());
+ LOG.info("Workflow context for " + resourceName + " created!");
}
// check ancestor job status
@@ -147,12 +152,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
}
if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+ LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
}
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
+ LOG.info(
+ "Workflow is marked as deleted " + workflowResource
+ + " cleaning up the workflow context.");
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName, currStateOutput);
}
@@ -160,6 +169,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
&& workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+ LOG.info("Workflow " + workflowResource
+ + " is completed and passed expiry time, cleaning up the workflow context.");
markForDeletion(_manager, workflowResource);
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName, currStateOutput);
@@ -176,6 +187,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
long jobFinishTime = jobCtx.getFinishTime();
if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
&& jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+ LOG.info("Job " + resourceName
+ + " is completed and passed expiry time, cleaning up the job context.");
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName, currStateOutput);
}
@@ -183,6 +196,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// The job is already in a final state (completed/failed).
if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
|| workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
+ LOG.debug("Job " + resourceName + " is failed or already completed.");
return emptyAssignment(resourceName, currStateOutput);
}
@@ -190,6 +204,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
boolean isReady =
scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
if (!isReady) {
+ LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
return emptyAssignment(resourceName, currStateOutput);
}
@@ -224,6 +239,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
+ LOG.debug("Job " + resourceName + " new assignment " + Arrays
+ .toString(newAssignment.getMappedPartitions().toArray()));
+
return newAssignment;
}
@@ -529,6 +547,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// Remove any timers that are past-time for this workflow
Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+ LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
SCHEDULED_TIMES.remove(workflowResource);
}
@@ -536,6 +555,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (scheduleConfig.isRecurring()) {
// Skip scheduling this workflow if it's not in a start state
if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+ LOG.debug(
+ "Skip scheduling since the workflow has not been started " + workflowResource);
return false;
}
@@ -543,8 +564,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
if (lastScheduled != null) {
WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
- if (lastWorkflowCtx == null
- || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+ if (lastWorkflowCtx != null
+ && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+ LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
return false;
}
}
@@ -559,7 +581,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// Now clone the workflow if this clone has not yet been created
String newWorkflowName =
workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
- if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) {
+ LOG.debug("Ready to start workflow " + newWorkflowName);
+ if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
timeToSchedule));
@@ -592,9 +615,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// Do nothing if there is already a timer set for the this workflow with the same start time.
if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
|| SCHEDULED_TIMES.inverse().containsKey(startTime)) {
+ LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
return;
}
- LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource);
+ LOG.info(
+ "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
+ + " delay from start: " + delayFromStart);
// For workflows not yet scheduled, schedule them and record it
RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
@@ -607,6 +633,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
long currentTime = now;
Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+ LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
SCHEDULED_TIMES.remove(jobResource);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 2235b80..bb62de5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -349,6 +349,7 @@ public class TaskUtil {
IdealState is = accessor.getProperty(key);
if (is != null) {
accessor.updateProperty(key, is);
+ LOG.debug("invoke rebalance for " + key);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 4ca6e68..f69605e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -26,8 +26,10 @@ import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.task.beans.JobBean;
@@ -192,10 +194,17 @@ public class Workflow {
*/
public void validate() {
// validate dag and configs
- if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) {
- throw new IllegalArgumentException("Nodes specified in DAG missing from config");
- } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet())) {
- throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
+ Set<String> jobNamesInConfig = new HashSet<String>(_jobConfigs.keySet());
+ Set<String> jobNamesInDag = new HashSet<String>(_workflowConfig.getJobDag().getAllNodes());
+ if (!jobNamesInConfig.equals(jobNamesInDag)) {
+ Set<String> jobNamesInConfigButNotInDag = new HashSet<String>(jobNamesInConfig);
+ jobNamesInConfigButNotInDag.removeAll(jobNamesInDag);
+ Set<String> jobNamesInDagButNotInConfig = new HashSet<String>(jobNamesInDag);
+ jobNamesInDagButNotInConfig.removeAll(jobNamesInConfig);
+
+ throw new IllegalArgumentException(
+ "Job Names dismatch. Names in config but not in dag: " + jobNamesInConfigButNotInDag +
+ ", names in dag but not in config: " + jobNamesInDagButNotInConfig);
}
_workflowConfig.getJobDag().validate();
http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 4656a23..deca8a7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.task;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;;
@@ -157,10 +158,31 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_manager.disconnect();
}
+ private Date getDateFromStartTime(String startTime)
+ {
+ int splitIndex = startTime.indexOf(':');
+ int hourOfDay = 0, minutes = 0;
+ try
+ {
+ hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
+ minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
+ }
+ catch (NumberFormatException e)
+ {
+
+ }
+ Calendar cal = Calendar.getInstance();
+ cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
+ cal.set(Calendar.MINUTE, minutes);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ return cal.getTime();
+ }
+
private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(500000));
- cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(120));
+ cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+ cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -168,6 +190,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
cal.set(Calendar.MILLISECOND, 0);
cfgMap.put(WorkflowConfig.START_TIME,
WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+ //cfgMap.put(WorkflowConfig.START_TIME,
+ //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
}
@@ -186,7 +210,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i <= 2; i++) {
+ for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
@@ -213,7 +237,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
queue = buildRecurrentJobQueue(queueName, 5);
_driver.createQueue(queue);
currentJobNames.clear();
- for (int i = 0; i <= 2; i++) {
+ for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
JobConfig.Builder job =
[2/3] helix git commit: [HELIX-615] Naming problem of scheduled jobs
from recurrent queue.
Posted by ki...@apache.org.
[HELIX-615] Naming problem of scheduled jobs from recurrent queue.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7569a0a7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7569a0a7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7569a0a7
Branch: refs/heads/helix-0.6.x
Commit: 7569a0a7b999fb6675447919d91e756200364ff5
Parents: d129d3a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:50:30 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:50:30 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskConstants.java | 4 ----
.../main/java/org/apache/helix/task/TaskDriver.java | 16 +++++++++-------
.../java/org/apache/helix/task/TaskRebalancer.java | 7 +++++--
.../integration/task/TestRecurringJobQueue.java | 2 +-
4 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 34008d6..305323d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -39,8 +39,4 @@ public class TaskConstants {
* The root property store path at which the {@link TaskRebalancer} stores context information.
*/
public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
- /**
- * Resource prefix for scheduled workflows
- */
- public static final String SCHEDULED = "SCHEDULED";
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index b4b94f8..cc1eac1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -570,13 +570,15 @@ public class TaskDriver {
private void setWorkflowTargetState(String workflowName, TargetState state) {
setSingleWorkflowTargetState(workflowName, state);
- // For recurring schedules, child workflows must also be handled
- List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
- String prefix = workflowName + "_" + TaskConstants.SCHEDULED;
- for (String resource : resources) {
- if (resource.startsWith(prefix)) {
- setSingleWorkflowTargetState(resource, state);
- }
+ // For recurring schedules, last scheduled incomplete workflow must also be handled
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
+ String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+ WorkflowContext lastScheduledWorkflowCtx =
+ TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+ if (lastScheduledWorkflowCtx != null &&
+ !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+ || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+ setSingleWorkflowTargetState(lastScheduledWorkflow, state);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 5a86c3d..3842b66 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,6 +19,8 @@ package org.apache.helix.task;
* under the License.
*/
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -579,8 +581,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
// Now clone the workflow if this clone has not yet been created
- String newWorkflowName =
- workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
+ DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
+ // Now clone the workflow if this clone has not yet been created
+ String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
LOG.debug("Ready to start workflow " + newWorkflowName);
if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index deca8a7..eef1ce6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;;
+import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
[3/3] helix git commit: [HELIX-616] Change JobQueue to be subclass of
Workflow instead of WorkflowConfig.
Posted by ki...@apache.org.
[HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bbb20be
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bbb20be
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bbb20be
Branch: refs/heads/helix-0.6.x
Commit: 7bbb20be67a939a57f33d8f6d7c814b1dc246575
Parents: 7569a0a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:54:34 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:54:34 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobQueue.java | 66 ++++++++++----------
.../java/org/apache/helix/task/TaskDriver.java | 42 ++++++++-----
.../java/org/apache/helix/task/TaskRunner.java | 5 +-
.../org/apache/helix/task/TaskStateModel.java | 8 +--
.../java/org/apache/helix/task/Workflow.java | 55 +++++++++++-----
.../org/apache/helix/task/WorkflowConfig.java | 14 +++++
.../task/TestIndependentTaskRebalancer.java | 1 -
.../integration/task/TestRecurringJobQueue.java | 52 ++++++++-------
.../task/TestTaskRebalancerParallel.java | 4 +-
9 files changed, 155 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index bca5911..0280c88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -19,31 +19,26 @@ package org.apache.helix.task;
* under the License.
*/
+import org.apache.helix.HelixException;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
* A named queue to which jobs can be added
*/
-public class JobQueue extends WorkflowConfig {
+public class JobQueue extends Workflow {
/* Config fields */
public static final String CAPACITY = "CAPACITY";
- private final String _name;
private final int _capacity;
- private JobQueue(String name, int capacity, WorkflowConfig config) {
- super(config.getJobDag(), config.getParallelJobs(), config.getTargetState(), config.getExpiry(),
- config.isTerminable(), config.getScheduleConfig());
- _name = name;
+ private JobQueue(String name, int capacity, WorkflowConfig workflowConfig,
+ Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
+ super(name, workflowConfig, jobConfigs, taskConfigs);
_capacity = capacity;
- }
-
- /**
- * Get the name of this queue
- * @return queue name
- */
- public String getName() {
- return _name;
+ validate();
}
/**
@@ -54,31 +49,24 @@ public class JobQueue extends WorkflowConfig {
return _capacity;
}
- @Override
public Map<String, String> getResourceConfigMap() throws Exception {
- Map<String, String> cfgMap = super.getResourceConfigMap();
+ Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap();
cfgMap.put(CAPACITY, String.valueOf(_capacity));
return cfgMap;
}
- /** Supports creation of a single empty queue */
- public static class Builder {
- private WorkflowConfig.Builder _builder;
- private final String _name;
+ /** Supports creation of a single queue */
+ public static class Builder extends Workflow.Builder {
private int _capacity = Integer.MAX_VALUE;
+ private List<String> jobs;
public Builder(String name) {
- _builder = new WorkflowConfig.Builder();
- _name = name;
- }
-
- public Builder parallelJobs(int parallelJobs) {
- _builder.setParallelJobs(parallelJobs);
- return this;
+ super(name);
+ jobs = new ArrayList<String>();
}
public Builder expiry(long expiry) {
- _builder.setExpiry(expiry);
+ _expiry = expiry;
return this;
}
@@ -87,18 +75,32 @@ public class JobQueue extends WorkflowConfig {
return this;
}
+ @Override
public Builder fromMap(Map<String, String> cfg) {
- _builder = WorkflowConfig.Builder.fromMap(cfg);
+ super.fromMap(cfg);
if (cfg.containsKey(CAPACITY)) {
_capacity = Integer.parseInt(cfg.get(CAPACITY));
}
return this;
}
+ public void enqueueJob(final String job, JobConfig.Builder jobBuilder) {
+ if (jobs.size() >= _capacity) {
+ throw new HelixException("Failed to push new job to jobQueue, it is already full");
+ }
+ addJobConfig(job, jobBuilder);
+ if (jobs.size() > 0) {
+ String previousJob = jobs.get(jobs.size() - 1);
+ addParentChildDependency(previousJob, job);
+ }
+ jobs.add(job);
+ }
+
public JobQueue build() {
- _builder.setTerminable(false);
- WorkflowConfig workflowConfig = _builder.build();
- return new JobQueue(_name, _capacity, workflowConfig);
+ WorkflowConfig.Builder builder = buildWorkflowConfig();
+ builder.setTerminable(false);
+ WorkflowConfig workflowConfig = builder.build();
+ return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index cc1eac1..654ba4e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -191,8 +191,8 @@ public class TaskDriver {
String flowName = flow.getName();
// first, add workflow config to ZK
- _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), flow
- .getWorkflowConfig().getResourceConfigMap());
+ _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+ flow.getWorkflowConfig().getResourceConfigMap());
// then schedule jobs
for (String job : flow.getJobConfigs().keySet()) {
@@ -206,14 +206,7 @@ public class TaskDriver {
/** Creates a new named job queue (workflow) */
public void createQueue(JobQueue queue) throws Exception {
- String queueName = queue.getName();
- HelixProperty property = new HelixProperty(queueName);
- property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap());
- boolean created =
- _accessor.createProperty(_accessor.keyBuilder().resourceConfig(queueName), property);
- if (!created) {
- throw new IllegalArgumentException("Queue " + queueName + " already exists!");
- }
+ start(queue);
}
/** Flushes a named job queue */
@@ -566,20 +559,35 @@ public class TaskDriver {
setWorkflowTargetState(workflow, TargetState.DELETE);
}
- /** Helper function to change target state for a given workflow */
+ /**
+ * Helper function to change target state for a given workflow
+ */
private void setWorkflowTargetState(String workflowName, TargetState state) {
setSingleWorkflowTargetState(workflowName, state);
+ // TODO: this is the temporary fix for current task rebalance implementation.
+ // We should fix this in new task framework implementation.
+ List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
+ for (String resource : resources) {
+ if (resource.startsWith(workflowName)) {
+ setSingleWorkflowTargetState(resource, state);
+ }
+ }
+
+ /* TODO: use this code for new task framework.
// For recurring schedules, last scheduled incomplete workflow must also be handled
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
- WorkflowContext lastScheduledWorkflowCtx =
- TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
- if (lastScheduledWorkflowCtx != null &&
- !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
- || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
- setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+ if (lastScheduledWorkflow != null) {
+ WorkflowContext lastScheduledWorkflowCtx =
+ TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+ if (lastScheduledWorkflowCtx != null && !(
+ lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+ || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+ setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+ }
}
+ */
}
/** Helper function to change target state for a given workflow */
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index abd1882..7b17043 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -48,6 +48,7 @@ public class TaskRunner implements Runnable {
// If true, indicates that the task has finished.
private volatile boolean _done = false;
+
public TaskRunner(Task task, String taskName, String taskPartition, String instance,
HelixManager manager, String sessionId) {
_task = task;
@@ -111,7 +112,9 @@ public class TaskRunner implements Runnable {
* Signals the task to cancel itself.
*/
public void cancel() {
- _task.cancel();
+ if (!_done) {
+ _task.cancel();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 30939fc..525a38b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -205,22 +205,22 @@ public class TaskStateModel extends StateModel {
@Transition(to = "INIT", from = "COMPLETED")
public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "STOPPED")
public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "TIMED_OUT")
public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "TASK_ERROR")
public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index f69605e..259b72c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -49,19 +49,19 @@ public class Workflow {
public static final String UNSPECIFIED = "UNSPECIFIED";
/** Workflow name */
- private String _name;
+ protected String _name;
/** Holds workflow-level configurations */
- private WorkflowConfig _workflowConfig;
+ protected WorkflowConfig _workflowConfig;
/** Contains the per-job configurations for all jobs specified in the provided dag */
- private Map<String, Map<String, String>> _jobConfigs;
+ protected Map<String, Map<String, String>> _jobConfigs;
/** Containers the per-job configurations of all individually-specified tasks */
- private Map<String, List<TaskConfig>> _taskConfigs;
+ protected Map<String, List<TaskConfig>> _taskConfigs;
/** Constructs and validates a workflow against a provided dag and config set */
- private Workflow(String name, WorkflowConfig workflowConfig,
+ protected Workflow(String name, WorkflowConfig workflowConfig,
Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
_name = name;
_workflowConfig = workflowConfig;
@@ -225,12 +225,14 @@ public class Workflow {
/** Build a workflow incrementally from dependencies and single configs, validate at build time */
public static class Builder {
- private String _name;
- private JobDag _dag;
- private Map<String, Map<String, String>> _jobConfigs;
- private Map<String, List<TaskConfig>> _taskConfigs;
- private ScheduleConfig _scheduleConfig;
- private long _expiry;
+ protected String _name;
+ protected JobDag _dag;
+ protected Map<String, Map<String, String>> _jobConfigs;
+ protected Map<String, List<TaskConfig>> _taskConfigs;
+ protected ScheduleConfig _scheduleConfig;
+ protected long _expiry;
+ protected Map<String, String> _cfgMap;
+ protected int _parallelJobs = -1;
public Builder(String name) {
_name = name;
@@ -287,6 +289,11 @@ public class Workflow {
return this;
}
+ public Builder fromMap(Map<String, String> cfg) {
+ _cfgMap = cfg;
+ return this;
+ }
+
public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
_scheduleConfig = scheduleConfig;
return this;
@@ -301,13 +308,30 @@ public class Workflow {
return TaskUtil.getNamespacedJobName(_name, job);
}
+ public Builder parallelJobs(int parallelJobs) {
+ _parallelJobs = parallelJobs;
+ return this;
+ }
+
public Workflow build() {
+ WorkflowConfig.Builder builder = buildWorkflowConfig();
+ // calls validate internally
+ return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs);
+ }
+
+ protected WorkflowConfig.Builder buildWorkflowConfig() {
for (String task : _jobConfigs.keySet()) {
// addConfig(task, TaskConfig.WORKFLOW_ID, _name);
_jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
}
- WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+ WorkflowConfig.Builder builder;
+ if (_cfgMap != null) {
+ builder = WorkflowConfig.Builder.fromMap(_cfgMap);
+ } else {
+ builder = new WorkflowConfig.Builder();
+ }
+
builder.setJobDag(_dag);
builder.setTargetState(TargetState.START);
if (_scheduleConfig != null) {
@@ -316,8 +340,11 @@ public class Workflow {
if (_expiry > 0) {
builder.setExpiry(_expiry);
}
- return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
- // internally
+ if (_parallelJobs != -1) {
+ builder.setParallelJobs(_parallelJobs);
+ }
+
+ return builder;
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index f15f235..56fba58 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -100,6 +100,20 @@ public class WorkflowConfig {
return defaultDateFormat;
}
+ /**
+ * Get the scheduled start time of the workflow.
+ *
+ * @return start time if the workflow schedule is set, null if no schedule config set.
+ */
+ public Date getStartTime() {
+ // Workflow with non-scheduled config is ready to schedule immediately.
+ if (_scheduleConfig == null) {
+ return null;
+ }
+
+ return _scheduleConfig.getStartTime();
+ }
+
public Map<String, String> getResourceConfigMap() throws Exception {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 1f17e92..a00a736 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -274,7 +274,6 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that the class was invoked
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index eef1ce6..38c9113 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -179,7 +179,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
return cal.getTime();
}
- private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+ private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
@@ -191,11 +191,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
cfgMap.put(WorkflowConfig.START_TIME,
WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
//cfgMap.put(WorkflowConfig.START_TIME,
- //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
- return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
+ //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+ return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
}
- private JobQueue buildRecurrentJobQueue(String jobQueueName) {
+
+ private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
return buildRecurrentJobQueue(jobQueueName, 0);
}
@@ -205,9 +206,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
-
+ JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
for (int i = 0; i <= 1; i++) {
@@ -218,10 +217,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
- _driver.enqueueJob(queueName, jobName, job);
+ queueBuild.enqueueJob(jobName, job);
currentJobNames.add(jobName);
}
+ _driver.start(queueBuild.build());
+
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
// ensure job 1 is started before stop it
@@ -234,8 +235,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_driver.delete(queueName);
Thread.sleep(500);
- queue = buildRecurrentJobQueue(queueName, 5);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
currentJobNames.clear();
for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -245,10 +245,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
- _driver.enqueueJob(queueName, jobName, job);
+ queueBuilder.enqueueJob(jobName, job);
currentJobNames.add(jobName);
}
+ _driver.createQueue(queueBuilder.build());
+
+
wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
// ensure jobs are started and completed
@@ -269,12 +272,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+ Thread.sleep(100);
for (int i = 0; i <= 4; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -285,9 +288,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
LOG.info("Enqueuing job: " + jobName);
- _driver.enqueueJob(queueName, jobName, job);
+ queueBuilder.enqueueJob(jobName, job);
currentJobNames.add(i, jobName);
}
+ _driver.createQueue(queueBuilder.build());
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
@@ -360,8 +364,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
// create jobs
List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -369,7 +372,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
final int JOB_COUNTS = 3;
-
for (int i = 0; i < JOB_COUNTS; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -384,8 +386,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// enqueue all jobs except last one
for (int i = 0; i < JOB_COUNTS - 1; ++i) {
LOG.info("Enqueuing job: " + jobNames.get(i));
- _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i));
+ queueBuilder.enqueueJob(jobNames.get(i), jobs.get(i));
}
+
+ _driver.createQueue(queueBuilder.build());
+
String currentLastJob = jobNames.get(JOB_COUNTS - 2);
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
@@ -398,6 +403,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// enqueue the last job
LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
_driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1));
+ _driver.stop(queueName);
// remove the last job
_driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1));
@@ -413,8 +419,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
// create jobs
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
@@ -431,8 +436,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetPartitionStates(Sets.newHashSet("MASTER"));
// enqueue both jobs
- _driver.enqueueJob(queueName, "job1", job1);
- _driver.enqueueJob(queueName, "job2", job2);
+ queueBuilder.enqueueJob("job1", job1);
+ queueBuilder.enqueueJob("job2", job2);
+
+ _driver.createQueue(queueBuilder.build());
+
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index f6fc53a..2ff8c56 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -156,7 +156,9 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
String queueName = TestHelper.getTestMethodName();
- JobQueue queue = new JobQueue.Builder(queueName).parallelJobs(PARALLEL_COUNT).build();
+ JobQueue.Builder queueBuild = new JobQueue.Builder(queueName);
+ queueBuild.parallelJobs(PARALLEL_COUNT);
+ JobQueue queue = queueBuild.build();
_driver.createQueue(queue);
List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();