You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/11/30 18:19:06 UTC
[3/3] helix git commit: [HELIX-616] Change JobQueue to be subclass of
Workflow instead of WorkflowConfig.
[HELIX-616] Change JobQueue to be subclass of Workflow instead of WorkflowConfig.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bbb20be
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bbb20be
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bbb20be
Branch: refs/heads/helix-0.6.x
Commit: 7bbb20be67a939a57f33d8f6d7c814b1dc246575
Parents: 7569a0a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Nov 20 15:54:34 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Nov 20 15:54:34 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobQueue.java | 66 ++++++++++----------
.../java/org/apache/helix/task/TaskDriver.java | 42 ++++++++-----
.../java/org/apache/helix/task/TaskRunner.java | 5 +-
.../org/apache/helix/task/TaskStateModel.java | 8 +--
.../java/org/apache/helix/task/Workflow.java | 55 +++++++++++-----
.../org/apache/helix/task/WorkflowConfig.java | 14 +++++
.../task/TestIndependentTaskRebalancer.java | 1 -
.../integration/task/TestRecurringJobQueue.java | 52 ++++++++-------
.../task/TestTaskRebalancerParallel.java | 4 +-
9 files changed, 155 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index bca5911..0280c88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -19,31 +19,26 @@ package org.apache.helix.task;
* under the License.
*/
+import org.apache.helix.HelixException;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
* A named queue to which jobs can be added
*/
-public class JobQueue extends WorkflowConfig {
+public class JobQueue extends Workflow {
/* Config fields */
public static final String CAPACITY = "CAPACITY";
- private final String _name;
private final int _capacity;
- private JobQueue(String name, int capacity, WorkflowConfig config) {
- super(config.getJobDag(), config.getParallelJobs(), config.getTargetState(), config.getExpiry(),
- config.isTerminable(), config.getScheduleConfig());
- _name = name;
+ private JobQueue(String name, int capacity, WorkflowConfig workflowConfig,
+ Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
+ super(name, workflowConfig, jobConfigs, taskConfigs);
_capacity = capacity;
- }
-
- /**
- * Get the name of this queue
- * @return queue name
- */
- public String getName() {
- return _name;
+ validate();
}
/**
@@ -54,31 +49,24 @@ public class JobQueue extends WorkflowConfig {
return _capacity;
}
- @Override
public Map<String, String> getResourceConfigMap() throws Exception {
- Map<String, String> cfgMap = super.getResourceConfigMap();
+ Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap();
cfgMap.put(CAPACITY, String.valueOf(_capacity));
return cfgMap;
}
- /** Supports creation of a single empty queue */
- public static class Builder {
- private WorkflowConfig.Builder _builder;
- private final String _name;
+ /** Supports creation of a single queue */
+ public static class Builder extends Workflow.Builder {
private int _capacity = Integer.MAX_VALUE;
+ private List<String> jobs;
public Builder(String name) {
- _builder = new WorkflowConfig.Builder();
- _name = name;
- }
-
- public Builder parallelJobs(int parallelJobs) {
- _builder.setParallelJobs(parallelJobs);
- return this;
+ super(name);
+ jobs = new ArrayList<String>();
}
public Builder expiry(long expiry) {
- _builder.setExpiry(expiry);
+ _expiry = expiry;
return this;
}
@@ -87,18 +75,32 @@ public class JobQueue extends WorkflowConfig {
return this;
}
+ @Override
public Builder fromMap(Map<String, String> cfg) {
- _builder = WorkflowConfig.Builder.fromMap(cfg);
+ super.fromMap(cfg);
if (cfg.containsKey(CAPACITY)) {
_capacity = Integer.parseInt(cfg.get(CAPACITY));
}
return this;
}
+ public void enqueueJob(final String job, JobConfig.Builder jobBuilder) {
+ if (jobs.size() >= _capacity) {
+ throw new HelixException("Failed to push new job to jobQueue, it is already full");
+ }
+ addJobConfig(job, jobBuilder);
+ if (jobs.size() > 0) {
+ String previousJob = jobs.get(jobs.size() - 1);
+ addParentChildDependency(previousJob, job);
+ }
+ jobs.add(job);
+ }
+
public JobQueue build() {
- _builder.setTerminable(false);
- WorkflowConfig workflowConfig = _builder.build();
- return new JobQueue(_name, _capacity, workflowConfig);
+ WorkflowConfig.Builder builder = buildWorkflowConfig();
+ builder.setTerminable(false);
+ WorkflowConfig workflowConfig = builder.build();
+ return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index cc1eac1..654ba4e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -191,8 +191,8 @@ public class TaskDriver {
String flowName = flow.getName();
// first, add workflow config to ZK
- _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), flow
- .getWorkflowConfig().getResourceConfigMap());
+ _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+ flow.getWorkflowConfig().getResourceConfigMap());
// then schedule jobs
for (String job : flow.getJobConfigs().keySet()) {
@@ -206,14 +206,7 @@ public class TaskDriver {
/** Creates a new named job queue (workflow) */
public void createQueue(JobQueue queue) throws Exception {
- String queueName = queue.getName();
- HelixProperty property = new HelixProperty(queueName);
- property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap());
- boolean created =
- _accessor.createProperty(_accessor.keyBuilder().resourceConfig(queueName), property);
- if (!created) {
- throw new IllegalArgumentException("Queue " + queueName + " already exists!");
- }
+ start(queue);
}
/** Flushes a named job queue */
@@ -566,20 +559,35 @@ public class TaskDriver {
setWorkflowTargetState(workflow, TargetState.DELETE);
}
- /** Helper function to change target state for a given workflow */
+ /**
+ * Helper function to change target state for a given workflow
+ */
private void setWorkflowTargetState(String workflowName, TargetState state) {
setSingleWorkflowTargetState(workflowName, state);
+ // TODO: this is the temporary fix for current task rebalance implementation.
+ // We should fix this in new task framework implementation.
+ List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
+ for (String resource : resources) {
+ if (resource.startsWith(workflowName)) {
+ setSingleWorkflowTargetState(resource, state);
+ }
+ }
+
+ /* TODO: use this code for new task framework.
// For recurring schedules, last scheduled incomplete workflow must also be handled
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
- WorkflowContext lastScheduledWorkflowCtx =
- TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
- if (lastScheduledWorkflowCtx != null &&
- !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
- || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
- setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+ if (lastScheduledWorkflow != null) {
+ WorkflowContext lastScheduledWorkflowCtx =
+ TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+ if (lastScheduledWorkflowCtx != null && !(
+ lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+ || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+ setSingleWorkflowTargetState(lastScheduledWorkflow, state);
+ }
}
+ */
}
/** Helper function to change target state for a given workflow */
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index abd1882..7b17043 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -48,6 +48,7 @@ public class TaskRunner implements Runnable {
// If true, indicates that the task has finished.
private volatile boolean _done = false;
+
public TaskRunner(Task task, String taskName, String taskPartition, String instance,
HelixManager manager, String sessionId) {
_task = task;
@@ -111,7 +112,9 @@ public class TaskRunner implements Runnable {
* Signals the task to cancel itself.
*/
public void cancel() {
- _task.cancel();
+ if (!_done) {
+ _task.cancel();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 30939fc..525a38b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -205,22 +205,22 @@ public class TaskStateModel extends StateModel {
@Transition(to = "INIT", from = "COMPLETED")
public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "STOPPED")
public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "TIMED_OUT")
public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "TASK_ERROR")
public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index f69605e..259b72c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -49,19 +49,19 @@ public class Workflow {
public static final String UNSPECIFIED = "UNSPECIFIED";
/** Workflow name */
- private String _name;
+ protected String _name;
/** Holds workflow-level configurations */
- private WorkflowConfig _workflowConfig;
+ protected WorkflowConfig _workflowConfig;
/** Contains the per-job configurations for all jobs specified in the provided dag */
- private Map<String, Map<String, String>> _jobConfigs;
+ protected Map<String, Map<String, String>> _jobConfigs;
/** Containers the per-job configurations of all individually-specified tasks */
- private Map<String, List<TaskConfig>> _taskConfigs;
+ protected Map<String, List<TaskConfig>> _taskConfigs;
/** Constructs and validates a workflow against a provided dag and config set */
- private Workflow(String name, WorkflowConfig workflowConfig,
+ protected Workflow(String name, WorkflowConfig workflowConfig,
Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
_name = name;
_workflowConfig = workflowConfig;
@@ -225,12 +225,14 @@ public class Workflow {
/** Build a workflow incrementally from dependencies and single configs, validate at build time */
public static class Builder {
- private String _name;
- private JobDag _dag;
- private Map<String, Map<String, String>> _jobConfigs;
- private Map<String, List<TaskConfig>> _taskConfigs;
- private ScheduleConfig _scheduleConfig;
- private long _expiry;
+ protected String _name;
+ protected JobDag _dag;
+ protected Map<String, Map<String, String>> _jobConfigs;
+ protected Map<String, List<TaskConfig>> _taskConfigs;
+ protected ScheduleConfig _scheduleConfig;
+ protected long _expiry;
+ protected Map<String, String> _cfgMap;
+ protected int _parallelJobs = -1;
public Builder(String name) {
_name = name;
@@ -287,6 +289,11 @@ public class Workflow {
return this;
}
+ public Builder fromMap(Map<String, String> cfg) {
+ _cfgMap = cfg;
+ return this;
+ }
+
public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
_scheduleConfig = scheduleConfig;
return this;
@@ -301,13 +308,30 @@ public class Workflow {
return TaskUtil.getNamespacedJobName(_name, job);
}
+ public Builder parallelJobs(int parallelJobs) {
+ _parallelJobs = parallelJobs;
+ return this;
+ }
+
public Workflow build() {
+ WorkflowConfig.Builder builder = buildWorkflowConfig();
+ // calls validate internally
+ return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs);
+ }
+
+ protected WorkflowConfig.Builder buildWorkflowConfig() {
for (String task : _jobConfigs.keySet()) {
// addConfig(task, TaskConfig.WORKFLOW_ID, _name);
_jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
}
- WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
+ WorkflowConfig.Builder builder;
+ if (_cfgMap != null) {
+ builder = WorkflowConfig.Builder.fromMap(_cfgMap);
+ } else {
+ builder = new WorkflowConfig.Builder();
+ }
+
builder.setJobDag(_dag);
builder.setTargetState(TargetState.START);
if (_scheduleConfig != null) {
@@ -316,8 +340,11 @@ public class Workflow {
if (_expiry > 0) {
builder.setExpiry(_expiry);
}
- return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
- // internally
+ if (_parallelJobs != -1) {
+ builder.setParallelJobs(_parallelJobs);
+ }
+
+ return builder;
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index f15f235..56fba58 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -100,6 +100,20 @@ public class WorkflowConfig {
return defaultDateFormat;
}
+ /**
+ * Get the scheduled start time of the workflow.
+ *
+ * @return start time if the workflow schedule is set, null if no schedule config set.
+ */
+ public Date getStartTime() {
+ // Workflow with non-scheduled config is ready to schedule immediately.
+ if (_scheduleConfig == null) {
+ return null;
+ }
+
+ return _scheduleConfig.getStartTime();
+ }
+
public Map<String, String> getResourceConfigMap() throws Exception {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 1f17e92..a00a736 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -274,7 +274,6 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_driver.start(workflowBuilder.build());
// Ensure the job completes
- TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
// Ensure that the class was invoked
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index eef1ce6..38c9113 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -179,7 +179,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
return cal.getTime();
}
- private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+ private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
@@ -191,11 +191,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
cfgMap.put(WorkflowConfig.START_TIME,
WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
//cfgMap.put(WorkflowConfig.START_TIME,
- //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
- return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
+ //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+ return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
}
- private JobQueue buildRecurrentJobQueue(String jobQueueName) {
+
+ private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
return buildRecurrentJobQueue(jobQueueName, 0);
}
@@ -205,9 +206,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
-
+ JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
for (int i = 0; i <= 1; i++) {
@@ -218,10 +217,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
- _driver.enqueueJob(queueName, jobName, job);
+ queueBuild.enqueueJob(jobName, job);
currentJobNames.add(jobName);
}
+ _driver.start(queueBuild.build());
+
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
// ensure job 1 is started before stop it
@@ -234,8 +235,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
_driver.delete(queueName);
Thread.sleep(500);
- queue = buildRecurrentJobQueue(queueName, 5);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
currentJobNames.clear();
for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -245,10 +245,13 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
- _driver.enqueueJob(queueName, jobName, job);
+ queueBuilder.enqueueJob(jobName, job);
currentJobNames.add(jobName);
}
+ _driver.createQueue(queueBuilder.build());
+
+
wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
// ensure jobs are started and completed
@@ -269,12 +272,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
// Create and Enqueue jobs
List<String> currentJobNames = new ArrayList<String>();
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+ Thread.sleep(100);
for (int i = 0; i <= 4; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -285,9 +288,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
LOG.info("Enqueuing job: " + jobName);
- _driver.enqueueJob(queueName, jobName, job);
+ queueBuilder.enqueueJob(jobName, job);
currentJobNames.add(i, jobName);
}
+ _driver.createQueue(queueBuilder.build());
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
@@ -360,8 +364,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
// create jobs
List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -369,7 +372,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
final int JOB_COUNTS = 3;
-
for (int i = 0; i < JOB_COUNTS; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -384,8 +386,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// enqueue all jobs except last one
for (int i = 0; i < JOB_COUNTS - 1; ++i) {
LOG.info("Enqueuing job: " + jobNames.get(i));
- _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i));
+ queueBuilder.enqueueJob(jobNames.get(i), jobs.get(i));
}
+
+ _driver.createQueue(queueBuilder.build());
+
String currentLastJob = jobNames.get(JOB_COUNTS - 2);
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
@@ -398,6 +403,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// enqueue the last job
LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
_driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1));
+ _driver.stop(queueName);
// remove the last job
_driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1));
@@ -413,8 +419,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue queue = buildRecurrentJobQueue(queueName);
- _driver.createQueue(queue);
+ JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
// create jobs
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
@@ -431,8 +436,11 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
.setTargetPartitionStates(Sets.newHashSet("MASTER"));
// enqueue both jobs
- _driver.enqueueJob(queueName, "job1", job1);
- _driver.enqueueJob(queueName, "job2", job2);
+ queueBuilder.enqueueJob("job1", job1);
+ queueBuilder.enqueueJob("job2", job2);
+
+ _driver.createQueue(queueBuilder.build());
+
WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
http://git-wip-us.apache.org/repos/asf/helix/blob/7bbb20be/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index f6fc53a..2ff8c56 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -156,7 +156,9 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
String queueName = TestHelper.getTestMethodName();
- JobQueue queue = new JobQueue.Builder(queueName).parallelJobs(PARALLEL_COUNT).build();
+ JobQueue.Builder queueBuild = new JobQueue.Builder(queueName);
+ queueBuild.parallelJobs(PARALLEL_COUNT);
+ JobQueue queue = queueBuild.build();
_driver.createQueue(queue);
List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();