You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:35 UTC
[3/4] helix git commit: Clean up jobs in a jobqueue automatically
after the job completes and passes its expiry time.
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 8e72f7a..6e6727c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,15 +19,6 @@ package org.apache.helix.task;
* under the License.
*/
-import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.*;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.*;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
-import org.apache.log4j.Logger;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -39,6 +30,23 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
+import com.google.common.collect.Lists;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.log4j.Logger;
+
+
/**
* Custom rebalancer implementation for the {@code Workflow} in task state model.
*/
@@ -52,7 +60,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
LOG.debug("Computer Best Partition for workflow: " + workflow);
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflow);
if (workflowCfg == null) {
LOG.warn("Workflow configuration is NULL for " + workflow);
return buildEmptyAssignment(workflow, currStateOutput);
@@ -70,7 +78,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
- cleanupWorkflow(workflow, workflowCfg);
+ cleanupWorkflow(workflow, workflowCfg);
return buildEmptyAssignment(workflow, currStateOutput);
}
@@ -124,7 +132,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
}
- cleanExpiredJobs(workflowCfg, workflowCtx);
+ // clean up the expired jobs if it is a queue.
+ if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
+ purgeExpiredJobs(workflow, workflowCfg, workflowCtx);
+ }
TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
return buildEmptyAssignment(workflow, currStateOutput);
@@ -158,7 +169,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
// check ancestor job status
if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
+
// Since the start time is calculated base on the time of completion of parent jobs for this
// job, the calculated start time should only be calculate once. Persist the calculated time
// in WorkflowContext znode.
@@ -440,140 +452,61 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
- * Cleans up workflow configs and workflow contexts associated with this workflow,
- * including all job-level configs and context, plus workflow-level information.
+ * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
+ * contexts associated with this workflow, and all jobs information, including their configs,
+ * context, IS and EV.
*/
private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
LOG.info("Cleaning up workflow: " + workflow);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- /*
- if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
- LOG.error("Workflow " + workflow + " has not completed, abort the clean up task.");
- return;
- }*/
-
- for (String job : workflowcfg.getJobDag().getAllNodes()) {
- cleanupJob(job, workflow);
- }
-
- // clean up workflow-level info if this was the last in workflow
if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
- // clean up IS & EV
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
-
- // delete workflow config
- PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
- if (accessor.getProperty(workflowCfgKey) != null) {
- if (!accessor.removeProperty(workflowCfgKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.",
- workflow, workflowCfgKey));
- }
+ Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
+ // Remove all pending timer tasks for this workflow if exists
+ _scheduledRebalancer.removeScheduledRebalance(workflow);
+ for (String job : jobs) {
+ _scheduledRebalancer.removeScheduledRebalance(job);
}
- // Delete workflow context
- LOG.info("Removing workflow context: " + workflow);
- if (!TaskUtil.removeWorkflowContext(_manager, workflow)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up workflow %s. Aborting further clean up steps.",
- workflow));
+ if (!TaskUtil.removeWorkflow(_manager, workflow, jobs)) {
+ LOG.warn("Failed to clean up workflow " + workflow);
}
-
- // Remove pending timer task for this workflow if exists
- _scheduledRebalancer.removeScheduledRebalance(workflow);
+ } else {
+ LOG.info("Did not clean up workflow " + workflow
+ + " because neither the workflow is non-terminable nor is set to DELETE.");
}
}
-
/**
- * Cleans up job configs and job contexts associated with this job,
- * including all job-level configs and context, plus the job info in the workflow context.
+ * Clean up all jobs that are COMPLETED and passes its expiry time.
+ *
+ * @param workflowConfig
+ * @param workflowContext
*/
- private void cleanupJob(final String job, String workflow) {
- LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
- // Remove any idealstate and externalView.
- cleanupIdealStateExtView(accessor, job);
-
- // Remove DAG references in workflow
- PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
- DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- for (String child : jobDag.getDirectChildren(job)) {
- jobDag.getChildrenToParents().get(child).remove(job);
- }
- for (String parent : jobDag.getDirectParents(job)) {
- jobDag.getParentsToChildren().get(parent).remove(job);
- }
- jobDag.getChildrenToParents().remove(job);
- jobDag.getParentsToChildren().remove(job);
- jobDag.getAllNodes().remove(job);
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- LOG.error("Could not update DAG for job: " + job, e);
- }
- } else {
- LOG.error("Could not update DAG for job: " + job + " ZNRecord is null.");
- }
- return currentData;
- }
- };
- accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
- AccessOption.PERSISTENT);
-
- // Delete job configs.
- PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job);
- if (accessor.getProperty(cfgKey) != null) {
- if (!accessor.removeProperty(cfgKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
- job, cfgKey));
- }
- }
-
- // Delete job context
- // For recurring workflow, it's OK if the node doesn't exist.
- if (!TaskUtil.removeJobContext(_manager, job)) {
- LOG.warn(String.format("Error occurred while trying to clean up job %s.", job));
- }
-
- LOG.info(String.format("Successfully cleaned up job context %s.", job));
-
- _scheduledRebalancer.removeScheduledRebalance(job);
- }
-
- private void cleanExpiredJobs(WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
- if (workflowContext == null) {
+ // TODO: run this in a separate thread.
+ // Get all jobConfigs & jobContext from ClusterCache.
+ protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
return;
}
- Map<String, TaskState> jobStates = workflowContext.getJobStates();
- long newTimeToClean = Long.MAX_VALUE;
- for (String job : workflowConfig.getJobDag().getAllNodes()) {
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
- JobContext jobContext = TaskUtil.getJobContext(_manager, job);
- // There is no ABORTED state for JobQueue Job. The job will die with workflow
- if (jobContext != null && jobStates.containsKey(job) && (
- jobStates.get(job) == TaskState.COMPLETED || jobStates.get(job) == TaskState.FAILED)) {
- if (System.currentTimeMillis() >= jobConfig.getExpiry() + jobContext.getFinishTime()) {
- cleanupJob(job, workflowConfig.getWorkflowId());
- } else {
- newTimeToClean =
- Math.min(newTimeToClean, jobConfig.getExpiry() + jobContext.getFinishTime());
- }
- }
+ Set<String> expiredJobs = TaskUtil
+ .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+ workflowConfig, workflowContext);
+ for (String job : expiredJobs) {
+ _scheduledRebalancer.removeScheduledRebalance(job);
+ }
+ if (!TaskUtil
+ .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+ workflow, expiredJobs, true)) {
+ LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
}
- if (newTimeToClean < Long.MAX_VALUE && newTimeToClean < _scheduledRebalancer
- .getRebalanceTime(workflowConfig.getWorkflowId())) {
- _scheduledRebalancer
- .scheduleRebalance(_manager, workflowConfig.getWorkflowId(), newTimeToClean);
+ long currentTime = System.currentTimeMillis();
+ long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
+ workflowContext.setLastJobPurgeTime(currentTime);
+ long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
+ if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+ _scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
index 7688017..ef8f971 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
@@ -116,7 +116,7 @@ public class TaskAdmin {
driver.flushQueue(workflow);
break;
case clean:
- driver.cleanupJobQueue(workflow);
+ driver.cleanupQueue(workflow);
break;
default:
throw new IllegalArgumentException("Unknown command " + args[0]);
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 641f13a..6036732 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -212,9 +212,10 @@ public class TaskTestUtil {
}
public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
- int failureThreshold) {
+ int failureThreshold, int capacity) {
WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
workflowCfgBuilder.setExpiry(120000);
+ workflowCfgBuilder.setCapacity(capacity);
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -228,8 +229,17 @@ public class TaskTestUtil {
return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
}
+ public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
+ int failureThreshold) {
+ return buildJobQueue(jobQueueName, delayStart, failureThreshold, 500);
+ }
+
public static JobQueue.Builder buildJobQueue(String jobQueueName) {
- return buildJobQueue(jobQueueName, 0, 0);
+ return buildJobQueue(jobQueueName, 0, 0, 500);
+ }
+
+ public static JobQueue.Builder buildJobQueue(String jobQueueName, int capacity) {
+ return buildJobQueue(jobQueueName, 0, 0, capacity);
}
public static WorkflowContext buildWorkflowContext(String workflowResource,
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index 71fed49..a0a1617 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -52,7 +52,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
_driver.start(builder.build());
_driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
TaskState.FAILED);
- _driver.cleanupJobQueue(queueName);
+ _driver.cleanupQueue(queueName);
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
}
@@ -71,7 +71,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
_driver.start(builder.build());
_driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 3),
TaskState.IN_PROGRESS);
- _driver.cleanupJobQueue(queueName);
+ _driver.cleanupQueue(queueName);
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index 16df022..4d0c3c6 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -19,6 +19,9 @@ package org.apache.helix.task;
* under the License.
*/
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.integration.task.MockTask;
@@ -39,34 +42,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
@Test
public void testCleanExpiredJobs() throws Exception {
- String workflowName = TestHelper.getTestMethodName();
- JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+ String queue = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
long startTime = System.currentTimeMillis();
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 8; i++) {
builder.enqueueJob("JOB" + i, jobBuilder);
- TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB" + i),
+ }
+
+ for (int i = 0; i < 8; i++) {
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
}
+ for (int i = 4; i < 6; i++) {
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
+ TaskTestUtil
+ .buildJobContext(startTime, startTime + 100000, TaskPartitionState.COMPLETED));
+ }
+
WorkflowContext workflowContext = TaskTestUtil
- .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
- TaskState.FAILED, TaskState.ABORTED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+ .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+ TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.COMPLETED,
+ TaskState.COMPLETED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+
+ Set<String> jobsLeft = new HashSet<String>();
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 1));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 2));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 4));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 5));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 6));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 7));
+
_driver.start(builder.build());
_cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
- TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+ TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
TaskTestUtil.calculateBestPossibleState(_cache, _manager);
- WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
- Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 3);
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+ Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
+ workflowContext = _driver.getWorkflowContext(queue);
+ Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
+ && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis());
}
- @Test void testNotCleanJobsDueToParentFail() throws Exception {
- String workflowName = TestHelper.getTestMethodName();
- JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+ @Test
+ void testNotCleanJobsDueToParentFail() throws Exception {
+ String queue = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
@@ -76,17 +102,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
builder.enqueueJob("JOB0", jobBuilder);
builder.enqueueJob("JOB1", jobBuilder);
builder.addParentChildDependency("JOB0", "JOB1");
- TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB0"),
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB0"),
TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
WorkflowContext workflowContext = TaskTestUtil
- .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.FAILED,
+ .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.FAILED,
TaskState.FAILED);
_driver.start(builder.build());
_cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
- TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+ TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
TaskTestUtil.calculateBestPossibleState(_cache, _manager);
- WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
- Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 1);
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+ Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2);
+ }
+
+ @Test
+ void testNotCleanJobsThroughEnqueueJob() throws Exception {
+ int capacity = 5;
+ String queue = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue, capacity);
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+ .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < capacity; i++) {
+ builder.enqueueJob("JOB" + i, jobBuilder);
+ }
+
+ _driver.start(builder.build());
+ try {
+ // should fail here since the queue is full.
+ _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+ Assert.fail("Queue is not full.");
+ } catch (HelixException e) {
+ Assert.assertTrue(e.getMessage().contains("queue is full"));
+ }
+
+ for (int i = 0; i < capacity; i++) {
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
+ TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
+ }
+
+ WorkflowContext workflowContext = TaskTestUtil
+ .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+ TaskState.COMPLETED, TaskState.FAILED, TaskState.IN_PROGRESS);
+ TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
+
+ _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+ Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), capacity - 1);
}
}