You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:36 UTC
[4/4] helix git commit: Clean up jobs in a jobqueue automatically
after the job completes and passes its expiry time.
Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e530bf51
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e530bf51
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e530bf51
Branch: refs/heads/master
Commit: e530bf5183e7ad2f3a27d0e75448b88e8554efe8
Parents: d2c3ebb
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jan 30 15:02:24 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700
----------------------------------------------------------------------
.../webapp/resources/JobQueueResource.java | 2 +-
.../helix/task/DeprecatedTaskRebalancer.java | 4 +-
.../main/java/org/apache/helix/task/JobDag.java | 48 +-
.../org/apache/helix/task/JobRebalancer.java | 22 +-
.../java/org/apache/helix/task/TaskDriver.java | 437 ++++++-----------
.../org/apache/helix/task/TaskRebalancer.java | 46 +-
.../org/apache/helix/task/TaskStateModel.java | 2 +-
.../java/org/apache/helix/task/TaskUtil.java | 486 ++++++++++++++++---
.../org/apache/helix/task/WorkflowConfig.java | 8 +
.../org/apache/helix/task/WorkflowContext.java | 52 +-
.../apache/helix/task/WorkflowRebalancer.java | 193 +++-----
.../java/org/apache/helix/tools/TaskAdmin.java | 2 +-
.../helix/integration/task/TaskTestUtil.java | 14 +-
.../integration/task/TestJobQueueCleanUp.java | 4 +-
.../apache/helix/task/TestCleanExpiredJobs.java | 100 +++-
15 files changed, 838 insertions(+), 582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index 32b782d..3df78f2 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -180,7 +180,7 @@ public class JobQueueResource extends ServerResource {
break;
}
case clean: {
- driver.cleanupJobQueue(jobQueueName);
+ driver.cleanupQueue(jobQueueName);
break;
}
default:
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 14c559c..8624398 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -125,7 +125,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
LOG.debug("Computer Best Partition for resource: " + resourceName);
// Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+ JobConfig jobCfg = TaskUtil.getJobConfig(_manager, resourceName);
if (jobCfg == null) {
LOG.debug("Job configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
@@ -133,7 +133,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
if (workflowCfg == null) {
LOG.debug("Workflow configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 32e1ffa..98a8c39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -34,6 +35,8 @@ import org.codehaus.jackson.map.ObjectMapper;
* and validate a job dependency graph
*/
public class JobDag {
+ private static final Logger LOG = Logger.getLogger(JobDag.class);
+
@JsonProperty("parentsToChildren")
private Map<String, Set<String>> _parentsToChildren;
@@ -66,7 +69,7 @@ public class JobDag {
_allNodes.add(child);
}
- public void removeParentToChild(String parent, String child) {
+ private void removeParentToChild(String parent, String child) {
if (_parentsToChildren.containsKey(parent)) {
Set<String> children = _parentsToChildren.get(parent);
children.remove(child);
@@ -91,7 +94,7 @@ public class JobDag {
/**
* must make sure no other node dependence before removing the node
*/
- public void removeNode(String node) {
+ private void removeNode(String node) {
if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) {
throw new IllegalStateException(
"The node is either a parent or a child of other node, could not be deleted");
@@ -100,6 +103,47 @@ public class JobDag {
_allNodes.remove(node);
}
+ /**
+ * Remove a node from the DAG.
+ * @param job
+ * @param maintainDependency: if true, the removed job's parent and child node will be linked together,
+ * otherwise, the job will be removed directly without modifying the existing dependency links.
+ */
+ public void removeNode(String job, boolean maintainDependency) {
+ if (!_allNodes.contains(job)) {
+ LOG.info("Could not delete job " + job + " from DAG, node does not exist");
+ return;
+ }
+ if (maintainDependency) {
+ String parent = null;
+ String child = null;
+ // remove the node from the queue
+ for (String n : _allNodes) {
+ if (getDirectChildren(n).contains(job)) {
+ parent = n;
+ removeParentToChild(parent, job);
+ } else if (getDirectParents(n).contains(job)) {
+ child = n;
+ removeParentToChild(job, child);
+ }
+ }
+ if (parent != null && child != null) {
+ addParentToChild(parent, child);
+ }
+ removeNode(job);
+ } else {
+ for (String child : getDirectChildren(job)) {
+ getChildrenToParents().get(child).remove(job);
+ }
+ for (String parent : getDirectParents(job)) {
+ getParentsToChildren().get(parent).remove(job);
+ }
+ _childrenToParents.remove(job);
+ _parentsToChildren.remove(job);
+ removeNode(job);
+ }
+ }
+
public Map<String, Set<String>> getParentsToChildren() {
return _parentsToChildren;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index c8deb35..d9093b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -33,6 +33,7 @@ import java.util.TreeSet;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
@@ -69,7 +70,7 @@ public class JobRebalancer extends TaskRebalancer {
LOG.debug("Computer Best Partition for job: " + jobName);
// Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
+ JobConfig jobCfg = TaskUtil.getJobConfig(_manager, jobName);
if (jobCfg == null) {
LOG.error("Job configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
@@ -77,7 +78,7 @@ public class JobRebalancer extends TaskRebalancer {
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
if (workflowCfg == null) {
LOG.error("Workflow configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
@@ -105,7 +106,7 @@ public class JobRebalancer extends TaskRebalancer {
LOG.info(String.format(
"Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
workflowResource, jobName, workflowState, jobState));
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+ TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
_scheduledRebalancer.removeScheduledRebalance(jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -358,7 +359,7 @@ public class JobRebalancer extends TaskRebalancer {
addAllPartitions(allPartitions, partitionsToDropFromIs);
// remove IdealState of this job
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+ TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
return buildEmptyAssignment(jobResource, currStateOutput);
} else {
skippedPartitions.add(pId);
@@ -397,7 +398,7 @@ public class JobRebalancer extends TaskRebalancer {
markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
// remove IdealState of this job
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+ TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
}
// Make additional task assignments if needed.
@@ -693,7 +694,7 @@ public class JobRebalancer extends TaskRebalancer {
}
for (Partition partition : assignment.getMappedPartitions()) {
- int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+ int pId = getPartitionId(partition.getPartitionName());
if (includeSet.contains(pId)) {
Map<String, String> replicaMap = assignment.getReplicaMap(partition);
for (String instance : replicaMap.keySet()) {
@@ -707,6 +708,15 @@ public class JobRebalancer extends TaskRebalancer {
return result;
}
+ /* Extracts the partition id from the given partition name. */
+ private static int getPartitionId(String pName) {
+ int index = pName.lastIndexOf("_");
+ if (index == -1) {
+ throw new HelixException("Invalid partition name " + pName);
+ }
+ return Integer.valueOf(pName.substring(index + 1));
+ }
+
private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
Set<Integer> nonReadyPartitions = Sets.newHashSet();
for (int p : ctx.getPartitionSet()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index df5cdf6..15c906a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -88,7 +88,7 @@ public class TaskDriver {
public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
- new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+ new ZkHelixPropertyStore<>(baseAccessor,
PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
}
@@ -136,7 +136,7 @@ public class TaskDriver {
newWorkflowConfig.setJobTypes(jobTypes);
// add workflow config.
- if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+ if (!TaskUtil.setWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
}
@@ -171,7 +171,7 @@ public class TaskDriver {
*/
public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
WorkflowConfig currentConfig =
- TaskUtil.getWorkflowCfg(_accessor, workflow);
+ TaskUtil.getWorkflowConfig(_accessor, workflow);
if (currentConfig == null) {
throw new HelixException("Workflow " + workflow + " does not exist!");
}
@@ -181,7 +181,9 @@ public class TaskDriver {
"Workflow " + workflow + " is terminable, not allow to change its configuration!");
}
- if (!TaskUtil.setResourceConfig(_accessor, workflow, newWorkflowConfig)) {
+ // Should not let user changing DAG in the workflow
+ newWorkflowConfig.setJobDag(currentConfig.getJobDag());
+ if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
LOG.error("Failed to update workflow configuration for workflow " + workflow);
}
@@ -198,283 +200,124 @@ public class TaskDriver {
}
/**
- * Remove all jobs in a job queue
+ * Remove all completed or failed jobs in a job queue
+ * Same as {@link #cleanupQueue(String)}
*
- * @param queueName
+ * @param queue name of the queue
* @throws Exception
*/
- // TODO: need to make sure the queue is stopped or completed before flush the queue.
- public void flushQueue(String queueName) {
- WorkflowConfig config =
- TaskUtil.getWorkflowCfg(_accessor, queueName);
- if (config == null) {
- throw new IllegalArgumentException("Queue does not exist!");
- }
-
- // Remove all ideal states and resource configs to trigger a drop event
- PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
- final Set<String> toRemove = Sets.newHashSet(config.getJobDag().getAllNodes());
- for (String resourceName : toRemove) {
- _accessor.removeProperty(keyBuilder.idealStates(resourceName));
- _accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
- // Delete context
- String contextKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
- _propertyStore.remove(contextKey, AccessOption.PERSISTENT);
- }
-
- // Now atomically clear the DAG
- String path = keyBuilder.resourceConfig(queueName).getPath();
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- for (String resourceName : toRemove) {
- for (String child : jobDag.getDirectChildren(resourceName)) {
- jobDag.getChildrenToParents().get(child).remove(resourceName);
- }
- for (String parent : jobDag.getDirectParents(resourceName)) {
- jobDag.getParentsToChildren().get(parent).remove(resourceName);
- }
- jobDag.getChildrenToParents().remove(resourceName);
- jobDag.getParentsToChildren().remove(resourceName);
- jobDag.getAllNodes().remove(resourceName);
- }
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
- }
- return currentData;
- }
- };
- _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
-
- // Now atomically clear the results
- path = Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
- updater = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- Map<String, String> states =
- currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
- if (states != null) {
- states.keySet().removeAll(toRemove);
- }
- return currentData;
- }
- };
- _propertyStore.update(path, updater, AccessOption.PERSISTENT);
+ public void flushQueue(String queue) {
+ cleanupQueue(queue);
}
/**
* Delete a job from an existing named queue,
* the queue has to be stopped prior to this call
*
- * @param queueName
- * @param jobName
+ * @param queue queue name
+ * @param job job name
*/
- public void deleteJob(final String queueName, final String jobName) {
+ public void deleteJob(final String queue, final String job) {
WorkflowConfig workflowCfg =
- TaskUtil.getWorkflowCfg(_accessor, queueName);
+ TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowCfg == null) {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
}
if (workflowCfg.isTerminable()) {
- throw new IllegalArgumentException(queueName + " is not a queue!");
+ throw new IllegalArgumentException(queue + " is not a queue!");
}
boolean isRecurringWorkflow =
(workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
if (isRecurringWorkflow) {
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-
- String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
- // delete the current scheduled one
- deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
-
- // Remove the job from the original queue template's DAG
- removeJobFromDag(queueName, jobName);
-
- // delete the ideal state and resource config for the template job
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- _admin.dropResource(_clusterName, namespacedJobName);
-
- // Delete the job template from property store
- String jobPropertyPath =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
- _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
- } else {
- deleteJobFromScheduledQueue(queueName, jobName, false);
- }
- }
-
- /**
- * delete a job from a scheduled (non-recurrent) queue.
- *
- * @param queueName
- * @param jobName
- */
- private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
- boolean isRecurrent) {
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_accessor, queueName);
-
- if (workflowCfg == null) {
- // When try to delete recurrent job, it could be either not started or finished. So
- // there may not be a workflow config.
- if (isRecurrent) {
- return;
- } else {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ // delete job from the last scheduled queue if there exists one.
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ String lastScheduledQueue = null;
+ if (wCtx != null) {
+ lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ }
+ if (lastScheduledQueue != null) {
+ WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue);
+ if (lastWorkflowCfg != null) {
+ deleteJobFromQueue(lastScheduledQueue, job);
+ }
}
}
+ deleteJobFromQueue(queue, job);
+ }
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
- if (wCtx != null && wCtx.getWorkflowState() == null) {
- throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
- }
-
- String workflowState =
- (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+ private void deleteJobFromQueue(final String queue, final String job) {
+ WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ String workflowState = (workflowCtx != null)
+ ? workflowCtx.getWorkflowState().name()
+ : TaskState.NOT_STARTED.name();
if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
- throw new IllegalStateException("Queue " + queueName + " is still in progress!");
+ throw new IllegalStateException("Queue " + queue + " is still running!");
}
- removeJob(queueName, jobName);
- }
-
- private void removeJob(String queueName, String jobName) {
- // Remove the job from the queue in the DAG
- removeJobFromDag(queueName, jobName);
-
- // delete the ideal state and resource config for the job
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- _admin.dropResource(_clusterName, namespacedJobName);
-
- // update queue's property to remove job from JOB_STATES if it is already started.
- removeJobStateFromQueue(queueName, jobName);
-
- // Delete the job from property store
- TaskUtil.removeJobContext(_propertyStore, namespacedJobName);
- }
-
- /** Remove the job name from the DAG from the queue configuration */
- private void removeJobFromDag(final String queueName, final String jobName) {
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-
- DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
- return null;
- }
- // Add the node to the existing DAG
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- Set<String> allNodes = jobDag.getAllNodes();
- if (!allNodes.contains(namespacedJobName)) {
- LOG.warn(
- "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
- return currentData;
- }
- String parent = null;
- String child = null;
- // remove the node from the queue
- for (String node : allNodes) {
- if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
- parent = node;
- jobDag.removeParentToChild(parent, namespacedJobName);
- } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
- child = node;
- jobDag.removeParentToChild(namespacedJobName, child);
- }
- }
- if (parent != null && child != null) {
- jobDag.addParentToChild(parent, child);
- }
- jobDag.removeNode(namespacedJobName);
-
- // Save the updated DAG
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalStateException(
- "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
- }
- return currentData;
- }
- };
-
- String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
- if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
- throw new IllegalArgumentException(
- "Could not remove job " + jobName + " from DAG of queue " + queueName);
+ if (workflowState.equals(TaskState.COMPLETED.name()) || workflowState.equals(
+ TaskState.FAILED.name()) || workflowState.equals(TaskState.ABORTED.name())) {
+ LOG.warn("Queue " + queue + " has already reached its final state, skip deleting job from it.");
+ return;
}
- }
-
- /** update queue's property to remove job from JOB_STATES if it is already started. */
- private void removeJobStateFromQueue(final String queueName, final String jobName) {
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- String queuePropertyPath =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- Map<String, String> states =
- currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
- if (states != null && states.containsKey(namespacedJobName)) {
- states.keySet().remove(namespacedJobName);
- }
- }
- return currentData;
- }
- };
- if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) {
- LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
+ String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+ Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName));
+ if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) {
+ LOG.error("Failed to delete job " + job + " from queue " + queue);
+ throw new HelixException("Failed to delete job " + job + " from queue " + queue);
}
}
/**
* Adds a new job to the end an existing named queue.
*
- * @param queueName
- * @param jobName
+ * @param queue
+ * @param job
* @param jobBuilder
* @throws Exception
*/
- public void enqueueJob(final String queueName, final String jobName,
+ public void enqueueJob(final String queue, final String job,
JobConfig.Builder jobBuilder) {
// Get the job queue config and capacity
- WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_accessor, queueName);
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
- throw new IllegalArgumentException("Queue " + queueName + " config does not yet exist!");
+ throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
}
- boolean isTerminable = workflowConfig.isTerminable();
- if (isTerminable) {
- throw new IllegalArgumentException(queueName + " is not a queue!");
+ if (workflowConfig.isTerminable()) {
+ throw new IllegalArgumentException(queue + " is not a queue!");
}
final int capacity = workflowConfig.getCapacity();
+ int queueSize = workflowConfig.getJobDag().size();
+ if (capacity > 0 && queueSize >= capacity) {
+ // if queue is full, Helix will try to clean up the expired job to free more space.
+ WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ if (workflowContext != null) {
+ Set<String> expiredJobs =
+ TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
+ if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
+ LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
+ }
+ }
+ workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
+ if (workflowConfig.getJobDag().size() >= capacity) {
+ throw new HelixException("Failed to enqueue a job, queue is full.");
+ }
+ }
// Create the job to ensure that it validates
- JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
-
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+ JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
+ final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
// add job config first.
addJobConfig(namespacedJobName, jobConfig);
final String jobType = jobConfig.getJobType();
- // Add the job to the end of the queue in the DAG
+ // update the job dag to append the job to the end of the queue.
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
@@ -484,11 +327,11 @@ public class TaskDriver {
Set<String> allNodes = jobDag.getAllNodes();
if (capacity > 0 && allNodes.size() >= capacity) {
throw new IllegalStateException(
- "Queue " + queueName + " is at capacity, will not add " + jobName);
+ "Queue " + queue + " already reaches its max capacity, failed to add " + job);
}
if (allNodes.contains(namespacedJobName)) {
throw new IllegalStateException(
- "Could not add to queue " + queueName + ", job " + jobName + " already exists");
+ "Could not add to queue " + queue + ", job " + job + " already exists");
}
jobDag.addNode(namespacedJobName);
@@ -511,7 +354,7 @@ public class TaskDriver {
if (jobTypes == null) {
jobTypes = new HashMap<String, String>();
}
- jobTypes.put(jobName, jobType);
+ jobTypes.put(queue, jobType);
currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
}
@@ -520,51 +363,58 @@ public class TaskDriver {
currentData
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
- throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
+ throw new IllegalStateException("Could not add job " + job + " to queue " + queue,
e);
}
return currentData;
}
};
- String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
+ String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
if (!status) {
- throw new IllegalArgumentException("Could not enqueue job");
+ throw new HelixException("Failed to enqueue job");
}
// This is to make it back-compatible with old Helix task driver.
- addWorkflowResourceIfNecessary(queueName);
+ addWorkflowResourceIfNecessary(queue);
// Schedule the job
- RebalanceScheduler.invokeRebalance(_accessor, queueName);
+ RebalanceScheduler.invokeRebalance(_accessor, queue);
}
/**
- * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue.
- * The job config, job context will be removed from Zookeeper.
+ * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. The
+ * job config, job context will be removed from Zookeeper.
*
- * @param queueName The name of job queue
+ * @param queue The name of job queue
*/
- public void cleanupJobQueue(String queueName) {
- WorkflowConfig workflowCfg =
- TaskUtil.getWorkflowCfg(_accessor, queueName);
+ public void cleanupQueue(String queue) {
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
- if (workflowCfg == null) {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ if (workflowConfig == null) {
+ throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
+ }
+
+ boolean isTerminable = workflowConfig.isTerminable();
+ if (isTerminable) {
+ throw new IllegalArgumentException(queue + " is not a queue!");
}
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
- if (wCtx != null && wCtx.getWorkflowState() == null) {
- throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ if (wCtx == null || wCtx.getWorkflowState() == null) {
+ throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
}
- for (String jobNode : workflowCfg.getJobDag().getAllNodes()) {
+ Set<String> jobs = new HashSet<String>();
+ for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
TaskState curState = wCtx.getJobState(jobNode);
if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
|| curState == TaskState.FAILED)) {
- removeJob(queueName, TaskUtil.getDenamespacedJobName(queueName, jobNode));
+ jobs.add(jobNode);
}
}
+
+ TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true);
}
/** Posts new workflow resource to cluster */
@@ -577,7 +427,6 @@ public class TaskDriver {
.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
_admin.setResourceIdealState(_clusterName, workflow, is);
-
}
/**
@@ -605,13 +454,13 @@ public class TaskDriver {
/**
* Add new job config to cluster
*/
- private void addJobConfig(String jobName, JobConfig jobConfig) {
- LOG.info("Add job configuration " + jobName);
+ private void addJobConfig(String job, JobConfig jobConfig) {
+ LOG.info("Add job configuration " + job);
// Set the job configuration
- JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
- if (!TaskUtil.setResourceConfig(_accessor, jobName, newJobCfg)) {
- LOG.error("Failed to add job configuration for job " + jobName);
+ JobConfig newJobCfg = new JobConfig(job, jobConfig);
+ if (!TaskUtil.setJobConfig(_accessor, job, newJobCfg)) {
+ throw new HelixException("Failed to add job configuration for job " + job);
}
}
@@ -691,14 +540,15 @@ public class TaskDriver {
/**
* Helper function to change target state for a given workflow
*/
- private void setWorkflowTargetState(String workflowName, TargetState state) {
- setSingleWorkflowTargetState(workflowName, state);
-
- // TODO: just need to change the lastScheduledWorkflow.
- List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
- for (String resource : resources) {
- if (resource.startsWith(workflowName)) {
- setSingleWorkflowTargetState(resource, state);
+ private void setWorkflowTargetState(String workflow, TargetState state) {
+ setSingleWorkflowTargetState(workflow, state);
+
+ // For recurring schedules, last scheduled incomplete workflow must also be handled
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+ if (wCtx != null) {
+ String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+ if (lastScheduledWorkflow != null) {
+ setSingleWorkflowTargetState(lastScheduledWorkflow, state);
}
}
}
@@ -706,42 +556,47 @@ public class TaskDriver {
/**
* Helper function to change target state for a given workflow
*/
- private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
- LOG.info("Set " + workflowName + " to target state " + state);
+ private void setSingleWorkflowTargetState(String workflow, final TargetState state) {
+ LOG.info("Set " + workflow + " to target state " + state);
+
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
+ if (workflowConfig == null) {
+ LOG.warn("WorkflowConfig for " + workflow + " not found!");
+ return;
+ }
+
+ WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+ if (state != TargetState.DELETE && workflowContext != null &&
+ (workflowContext.getFinishTime() != WorkflowContext.UNFINISHED
+ || workflowContext.getWorkflowState() == TaskState.COMPLETED
+ || workflowContext.getWorkflowState() == TaskState.FAILED)) {
+ // Should not update target state for completed workflow
+ LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
+ + state);
+ return;
+ }
+
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override public ZNRecord update(ZNRecord currentData) {
if (currentData != null) {
- // Only update target state for non-completed workflows
- String finishTime = currentData
- .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name());
- if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
- currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
- state.name());
- } else {
- LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
- }
+ currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+ state.name());
} else {
- LOG.error("TargetState DataUpdater: Fails to update target state " + currentData);
+ LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is "
+ + currentData);
}
return currentData;
}
};
- List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
- List<String> paths = Lists.newArrayList();
-
- PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
- if (_accessor.getProperty(cfgKey) != null) {
- paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
- updaters.add(updater);
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- RebalanceScheduler.invokeRebalance(_accessor, workflowName);
- } else {
- LOG.error("Configuration path " + cfgKey + " not found!");
- }
+
+ PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
+ _accessor.getBaseDataAccessor()
+ .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT);
+ RebalanceScheduler.invokeRebalance(_accessor, workflow);
}
public WorkflowConfig getWorkflowConfig(String workflow) {
- return TaskUtil.getWorkflowCfg(_accessor, workflow);
+ return TaskUtil.getWorkflowConfig(_accessor, workflow);
}
public WorkflowContext getWorkflowContext(String workflow) {
@@ -749,7 +604,7 @@ public class TaskDriver {
}
public JobConfig getJobConfig(String job) {
- return TaskUtil.getJobCfg(_accessor, job);
+ return TaskUtil.getJobConfig(_accessor, job);
}
public JobContext getJobContext(String job) {
@@ -761,7 +616,7 @@ public class TaskDriver {
}
public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
- return TaskUtil.getWorkflowCfg(manager, workflow);
+ return TaskUtil.getWorkflowConfig(manager, workflow);
}
public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
@@ -769,7 +624,7 @@ public class TaskDriver {
}
public static JobConfig getJobConfig(HelixManager manager, String job) {
- return TaskUtil.getJobCfg(manager, job);
+ return TaskUtil.getJobConfig(manager, job);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 312c499..20a9233 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -23,10 +23,8 @@ import java.util.Date;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -47,6 +45,7 @@ import com.google.common.collect.Maps;
public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
public static final String START_TIME_KEY = "StartTime";
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+ protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
// For connection management
protected HelixManager _manager;
@@ -84,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
ctx.setJobState(jobToFail, TaskState.ABORTED);
_clusterStatusMonitor
- .updateJobCounters(TaskUtil.getJobCfg(_manager, jobToFail), TaskState.ABORTED);
+ .updateJobCounters(TaskUtil.getJobConfig(_manager, jobToFail), TaskState.ABORTED);
}
}
return true;
@@ -171,7 +170,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// If there is parent job failed, schedule the job only when ignore dependent
// job failure enabled
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
markJobFailed(job, null, workflowCfg, workflowCtx);
LOG.debug(
@@ -239,7 +238,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
protected void scheduleJobCleanUp(String jobName, WorkflowConfig workflowConfig,
long currentTime) {
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_manager, jobName);
long currentScheduledTime =
_scheduledRebalancer.getRebalanceTime(workflowConfig.getWorkflowId()) == -1
? Long.MAX_VALUE
@@ -262,41 +261,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
}
- /**
- * Cleans up IdealState and external view associated with a job/workflow resource.
- */
- protected static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
- LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
-
- // Delete the ideal state itself.
- PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
- if (accessor.getProperty(isKey) != null) {
- if (!accessor.removeProperty(isKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
- resourceName, isKey));
- }
- } else {
- LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
- }
-
- // Delete dead external view
- // because job is already completed, there is no more current state change
- // thus dead external views removal will not be triggered
- PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
- if (accessor.getProperty(evKey) != null) {
- if (!accessor.removeProperty(evKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
- resourceName, evKey));
- }
- }
-
- LOG.info(String
- .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
- }
-
- @Override public IdealState computeNewIdealState(String resourceName,
+ @Override
+ public IdealState computeNewIdealState(String resourceName,
IdealState currentIdealState, CurrentStateOutput currentStateOutput,
ClusterDataCache clusterData) {
// All of the heavy lifting is in the ResourceAssignment computation,
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a7c58d2..61e0394 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -268,7 +268,7 @@ public class TaskStateModel extends StateModel {
}
private void startTask(Message msg, String taskPartition) {
- JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+ JobConfig cfg = TaskUtil.getJobConfig(_manager, msg.getResourceName());
TaskConfig taskConfig = null;
String command = cfg.getCommand();
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index effdd44..f064bbf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,10 +20,13 @@ package org.apache.helix.task;
*/
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
@@ -38,6 +41,7 @@ import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -59,12 +63,12 @@ public class TaskUtil {
* This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
*
* @param accessor Accessor to access Helix configs
- * @param jobResource The name of the job resource
+ * @param job The name of the job resource
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
* otherwise.
*/
- protected static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
- HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
+ protected static JobConfig getJobConfig(HelixDataAccessor accessor, String job) {
+ HelixProperty jobResourceConfig = getResourceConfig(accessor, job);
if (jobResourceConfig == null) {
return null;
}
@@ -76,12 +80,38 @@ public class TaskUtil {
* This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
*
* @param manager HelixManager object used to connect to Helix.
- * @param jobResource The name of the job resource.
+ * @param job The name of the job resource.
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
* otherwise.
*/
- protected static JobConfig getJobCfg(HelixManager manager, String jobResource) {
- return getJobCfg(manager.getHelixDataAccessor(), jobResource);
+ protected static JobConfig getJobConfig(HelixManager manager, String job) {
+ return getJobConfig(manager.getHelixDataAccessor(), job);
+ }
+
+ /**
+ * Set the job config
+ *
+ * @param accessor Accessor to Helix configs
+ * @param job The job name
+ * @param jobConfig The job config to be set
+ *
+ * @return True if set successfully, otherwise false
+ */
+ protected static boolean setJobConfig(HelixDataAccessor accessor, String job,
+ JobConfig jobConfig) {
+ return setResourceConfig(accessor, job, jobConfig);
+ }
+
+ /**
+ * Remove a job config.
+ *
+ * @param accessor
+ * @param job
+ *
+ * @return
+ */
+ protected static boolean removeJobConfig(HelixDataAccessor accessor, String job) {
+ return removeWorkflowJobConfig(accessor, job);
}
/**
@@ -93,7 +123,7 @@ public class TaskUtil {
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- protected static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+ protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor accessor, String workflow) {
HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
if (workflowCfg == null) {
return null;
@@ -111,21 +141,30 @@ public class TaskUtil {
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- protected static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
- return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
+ protected static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
+ return getWorkflowConfig(manager.getHelixDataAccessor(), workflow);
}
/**
- * Set the resource config
+ * Set the workflow config
* @param accessor Accessor to Helix configs
- * @param resource The resource name
- * @param resourceConfig The resource config to be set
+ * @param workflow The workflow name
+ * @param workflowConfig The workflow config to be set
* @return True if set successfully, otherwise false
*/
- protected static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
- ResourceConfig resourceConfig) {
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+ protected static boolean setWorkflowConfig(HelixDataAccessor accessor, String workflow,
+ WorkflowConfig workflowConfig) {
+ return setResourceConfig(accessor, workflow, workflowConfig);
+ }
+
+ /**
+ * Remove a workflow config.
+ * @param accessor
+ * @param workflow
+ * @return
+ */
+ protected static boolean removeWorkflowConfig(HelixDataAccessor accessor, String workflow) {
+ return removeWorkflowJobConfig(accessor, workflow);
}
/**
@@ -199,14 +238,12 @@ public class TaskUtil {
* This method is internal API.
*
* @param propertyStore Property store for the cluster
- * @param jobResource The name of the job
+ * @param job The name of the job
* @return True if remove success, otherwise false
*/
protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
- String jobResource) {
- return propertyStore.remove(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
- AccessOption.PERSISTENT);
+ String job) {
+ return removeWorkflowJobContext(propertyStore, job);
}
/**
@@ -230,25 +267,25 @@ public class TaskUtil {
* This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
*
* @param manager a connection to Helix
- * @param workflowResource the name of the workflow
+ * @param workflow the name of the workflow
* @return the {@link WorkflowContext}, or null if none is available
*/
- protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
- return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+ protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
+ return getWorkflowContext(manager.getHelixPropertyStore(), workflow);
}
/**
* Set the runtime context of a single workflow
*
* @param manager a connection to Helix
- * @param workflowResource the name of the workflow
- * @param ctx the up-to-date {@link WorkflowContext} for the workflow
+ * @param workflow the name of the workflow
+ * @param workflowContext the up-to-date {@link WorkflowContext} for the workflow
*/
- protected static void setWorkflowContext(HelixManager manager, String workflowResource,
- WorkflowContext ctx) {
+ protected static void setWorkflowContext(HelixManager manager, String workflow,
+ WorkflowContext workflowContext) {
manager.getHelixPropertyStore().set(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
- ctx.getRecord(), AccessOption.PERSISTENT);
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE),
+ workflowContext.getRecord(), AccessOption.PERSISTENT);
}
/**
@@ -256,11 +293,11 @@ public class TaskUtil {
* This method is internal API.
*
* @param manager A connection to Helix
- * @param workflowResource The name of the workflow
+ * @param workflow The name of the workflow
* @return True if remove success, otherwise false
*/
- protected static boolean removeWorkflowContext(HelixManager manager, String workflowResource) {
- return removeWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+ protected static boolean removeWorkflowContext(HelixManager manager, String workflow) {
+ return removeWorkflowContext(manager.getHelixPropertyStore(), workflow);
}
/**
@@ -268,14 +305,12 @@ public class TaskUtil {
* This method is internal API.
*
* @param propertyStore Property store for the cluster
- * @param workflowResource The name of the workflow
+ * @param workflow The name of the workflow
* @return True if remove success, otherwise false
*/
protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
- String workflowResource) {
- return propertyStore.remove(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource),
- AccessOption.PERSISTENT);
+ String workflow) {
+ return removeWorkflowJobContext(propertyStore, workflow);
}
/**
@@ -303,8 +338,8 @@ public class TaskUtil {
protected static String getWorkflowJobUserContent(HelixManager manager,
String workflowJobResource, String key) {
ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
- AccessOption.PERSISTENT);
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE),
+ null, AccessOption.PERSISTENT);
return r != null ? r.getSimpleField(key) : null;
}
@@ -333,19 +368,19 @@ public class TaskUtil {
* Get user defined task level key-value pair data
*
* @param manager a connection to Helix
- * @param jobResource the name of job
- * @param taskResource the name of the task
+ * @param job the name of job
+ * @param task the name of the task
* @param key the key of key-value pair
*
* @return null if there is no such pair, otherwise return a String
*/
- protected static String getTaskUserContent(HelixManager manager, String jobResource,
- String taskResource, String key) {
+ protected static String getTaskUserContent(HelixManager manager, String job,
+ String task, String key) {
ZNRecord r = manager.getHelixPropertyStore().get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE),
- null, AccessOption.PERSISTENT);
- return r != null ? (r.getMapField(taskResource) != null
- ? r.getMapField(taskResource).get(key)
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
+ AccessOption.PERSISTENT);
+ return r != null ? (r.getMapField(task) != null
+ ? r.getMapField(task).get(key)
: null) : null;
}
@@ -353,22 +388,22 @@ public class TaskUtil {
* Add an user defined key-value pair data to task level
*
* @param manager a connection to Helix
- * @param jobResource the name of job
- * @param taskResource the name of task
+ * @param job the name of job
+ * @param task the name of task
* @param key the key of key-value pair
* @param value the value of key-value pair
*/
- protected static void addTaskUserContent(final HelixManager manager, String jobResource,
- final String taskResource, final String key, final String value) {
+ protected static void addTaskUserContent(final HelixManager manager, String job,
+ final String task, final String key, final String value) {
String path =
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE);
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE);
manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
@Override public ZNRecord update(ZNRecord znRecord) {
- if (znRecord.getMapField(taskResource) == null) {
- znRecord.setMapField(taskResource, new HashMap<String, String>());
+ if (znRecord.getMapField(task) == null) {
+ znRecord.setMapField(task, new HashMap<String, String>());
}
- znRecord.getMapField(taskResource).put(key, value);
+ znRecord.getMapField(task).put(key, value);
return znRecord;
}
}, AccessOption.PERSISTENT);
@@ -386,25 +421,25 @@ public class TaskUtil {
/**
* Get a workflow-qualified job name for a job in that workflow
*
- * @param workflowResource the name of the workflow
+ * @param workflow the name of the workflow
* @param jobName the un-namespaced name of the job
* @return The namespaced job name, which is just workflowResource_jobName
*/
- public static String getNamespacedJobName(String workflowResource, String jobName) {
- return workflowResource + "_" + jobName;
+ public static String getNamespacedJobName(String workflow, String jobName) {
+ return workflow + "_" + jobName;
}
/**
* Remove the workflow namespace from the job name
*
- * @param workflowResource the name of the workflow that owns the job
+ * @param workflow the name of the workflow that owns the job
* @param jobName the namespaced job name
* @return the denamespaced job name, or the same job name if it is already denamespaced
*/
- public static String getDenamespacedJobName(String workflowResource, String jobName) {
- if (jobName.contains(workflowResource)) {
+ public static String getDenamespacedJobName(String workflow, String jobName) {
+ if (jobName.contains(workflow)) {
// skip the entire length of the work plus the underscore
- return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
+ return jobName.substring(jobName.indexOf(workflow) + workflow.length() + 1);
} else {
return jobName;
}
@@ -416,6 +451,8 @@ public class TaskUtil {
* @param commandConfig map of job config key to config value
* @return serialized string
*/
+ // TODO: move this to the JobConfig
+ @Deprecated
public static String serializeJobCommandConfigMap(Map<String, String> commandConfig) {
ObjectMapper mapper = new ObjectMapper();
try {
@@ -433,6 +470,8 @@ public class TaskUtil {
* @param commandConfig the serialized job config map
* @return a map of job config key to config value
*/
+ // TODO: move this to the JobConfig
+ @Deprecated
public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig) {
ObjectMapper mapper = new ObjectMapper();
try {
@@ -446,18 +485,13 @@ public class TaskUtil {
return Collections.emptyMap();
}
- private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- return accessor.getProperty(keyBuilder.resourceConfig(resource));
- }
-
/**
* Extracts the partition id from the given partition name.
*
* @param pName
* @return
*/
- public static int getPartitionId(String pName) {
+ protected static int getPartitionId(String pName) {
int index = pName.lastIndexOf("_");
if (index == -1) {
throw new HelixException("Invalid partition name " + pName);
@@ -465,12 +499,320 @@ public class TaskUtil {
return Integer.valueOf(pName.substring(index + 1));
}
- public static String getWorkflowContextKey(String resource) {
+ @Deprecated
+ public static String getWorkflowContextKey(String workflow) {
// TODO: fix this to use the keyBuilder.
- return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+ return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow);
}
- public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String workflow) {
+ @Deprecated
+ public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, String workflow) {
return accessor.keyBuilder().resourceConfig(workflow);
}
+
+ /**
+ * Cleans up IdealState and external view associated with a job.
+ *
+ * @param accessor
+ * @param job
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, String job) {
+ return cleanupIdealStateExtView(accessor, job);
+ }
+
+ /**
+ * Cleans up IdealState and external view associated with a workflow.
+ *
+ * @param accessor
+ * @param workflow
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor,
+ String workflow) {
+ return cleanupIdealStateExtView(accessor, workflow);
+ }
+
+ /**
+ * Cleans up IdealState and external view associated with a job/workflow resource.
+ */
+ private static boolean cleanupIdealStateExtView(final HelixDataAccessor accessor,
+ String workflowJobResource) {
+ boolean success = true;
+ PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource);
+ if (accessor.getProperty(isKey) != null) {
+ if (!accessor.removeProperty(isKey)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove IdealState for %s. Failed to remove node %s.",
+ workflowJobResource, isKey));
+ success = false;
+ }
+ }
+
+ // Delete external view
+ PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource);
+ if (accessor.getProperty(evKey) != null) {
+ if (!accessor.removeProperty(evKey)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove ExternalView of resource %s. Failed to remove node %s.",
+ workflowJobResource, evKey));
+ success = false;
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ * Remove a workflow and all jobs for the workflow. This removes the workflow config, idealstate,
+ * externalview and workflow contexts associated with this workflow, and all jobs information,
+ * including their configs, context, IS and EV.
+ *
+ * @param manager
+ * @param workflow the workflow name.
+ * @param jobs all job names in this workflow.
+ *
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean removeWorkflow(final HelixManager manager, String workflow,
+ Set<String> jobs) {
+ boolean success = true;
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+ // clean up all jobs
+ for (String job : jobs) {
+ if (!removeJob(accessor, manager.getHelixPropertyStore(), job)) {
+ success = false;
+ }
+ }
+
+ if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
+ LOG.warn(String
+ .format("Error occurred while trying to remove workflow idealstate/externalview for %s.",
+ workflow));
+ success = false;
+ }
+ if (!removeWorkflowConfig(accessor, workflow)) {
+ LOG.warn(
+ String.format("Error occurred while trying to remove workflow config for %s.", workflow));
+ success = false;
+ }
+ if (!removeWorkflowContext(manager, workflow)) {
+ LOG.warn(String
+ .format("Error occurred while trying to remove workflow context for %s.", workflow));
+ success = false;
+ }
+
+ return success;
+ }
+
+ /**
+ * Remove a set of jobs from a workflow. This removes the config, context, IS and EV associated
+ * with each individual job, and removes all the jobs from the WorkflowConfig, and job states from
+ * WorkflowContext.
+ *
+ * @param dataAccessor
+ * @param propertyStore
+ * @param jobs
+ * @param workflow
+ * @param maintainDependency
+ *
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean removeJobsFromWorkflow(final HelixDataAccessor dataAccessor,
+ final HelixPropertyStore propertyStore, final String workflow, final Set<String> jobs,
+ boolean maintainDependency) {
+ boolean success = true;
+ if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) {
+ LOG.warn("Error occurred while trying to remove jobs + " + jobs + " from the workflow "
+ + workflow);
+ success = false;
+ }
+ if (!removeJobsState(propertyStore, workflow, jobs)) {
+ LOG.warn(
+ "Error occurred while trying to remove jobs states from workflow + " + workflow + " jobs "
+ + jobs);
+ success = false;
+ }
+ for (String job : jobs) {
+ if (!removeJob(dataAccessor, propertyStore, job)) {
+ success = false;
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ * Return all jobs that are COMPLETED and passes its expiry time.
+ *
+ * @param dataAccessor
+ * @param propertyStore
+ * @param workflowConfig
+ * @param workflowContext
+ *
+ * @return
+ */
+ protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
+ HelixPropertyStore propertyStore, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ Set<String> expiredJobs = new HashSet<String>();
+
+ if (workflowContext != null) {
+ Map<String, TaskState> jobStates = workflowContext.getJobStates();
+ for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
+ JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
+ long expiry = jobConfig.getExpiry();
+ if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
+ expiry = workflowConfig.getExpiry();
+ }
+ if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+ if (System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+ expiredJobs.add(job);
+ }
+ }
+ }
+ }
+ return expiredJobs;
+ }
+
+ /* remove IS/EV, config and context of a job */
+ // Jobname is here should be NamespacedJobName.
+ private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+ String job) {
+ boolean success = true;
+ if (!cleanupJobIdealStateExtView(accessor, job)) {
+ LOG.warn(String
+ .format("Error occurred while trying to remove job idealstate/externalview for %s.",
+ job));
+ success = false;
+ }
+ if (!removeJobConfig(accessor, job)) {
+ LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
+ success = false;
+ }
+ if (!removeJobContext(propertyStore, job)) {
+ LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job));
+ success = false;
+ }
+
+ return success;
+ }
+
+ /** Remove the job name from the DAG from the queue configuration */
+ // Job name should be namespaced job name here.
+ private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+ final Set<String> jobsToRemove, final boolean maintainDependency) {
+ // Now atomically clear the DAG
+ DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ JobDag jobDag = JobDag.fromJson(
+ currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
+ if (jobDag == null) {
+ LOG.warn("Could not update DAG for workflow: " + workflow + " JobDag is null.");
+ return null;
+ }
+ for (String job : jobsToRemove) {
+ jobDag.removeNode(job, maintainDependency);
+ }
+ try {
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return currentData;
+ }
+ };
+
+ String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath();
+ if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) {
+ LOG.warn("Failed to remove jobs " + jobsToRemove + " from DAG of workflow " + workflow);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * update workflow's property to remove jobs from JOB_STATES if there are already started.
+ */
+ private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+ final String workflow, final Set<String> jobs) {
+ String contextPath =
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
+
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+ @Override public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ WorkflowContext workflowContext = new WorkflowContext(currentData);
+ workflowContext.removeJobStates(jobs);
+ currentData = workflowContext.getRecord();
+ }
+ return currentData;
+ }
+ };
+ if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) {
+ LOG.warn("Fail to remove job state for jobs " + jobs + " from workflow " + workflow);
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean removeWorkflowJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+ String workflowJobResource) {
+ String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource);
+ if (propertyStore.exists(path, AccessOption.PERSISTENT)) {
+ if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove workflow/jobcontext for %s. Failed to remove node %s.",
+ workflowJobResource, path));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Remove workflow or job config.
+ *
+ * @param accessor
+ * @param workflowJobResource the workflow or job name
+ */
+ private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor,
+ String workflowJobResource) {
+ PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
+ if (accessor.getProperty(cfgKey) != null) {
+ if (!accessor.removeProperty(cfgKey)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove config for %s. Failed to remove node %s.",
+ workflowJobResource, cfgKey));
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Set the resource config
+ * @param accessor Accessor to Helix configs
+ * @param resource The resource name
+ * @param resourceConfig The resource config to be set
+ * @return True if set successfully, otherwise false
+ */
+ private static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
+ ResourceConfig resourceConfig) {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+ }
+
+ private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return accessor.getProperty(keyBuilder.resourceConfig(resource));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 4c6e971..80b5973 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -149,6 +149,14 @@ public class WorkflowConfig extends ResourceConfig {
.fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : DEFAULT_JOB_DAG;
}
+ protected void setJobDag(JobDag jobDag) {
+ try {
+ putSimpleConfig(WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+ } catch (IOException ex) {
+ throw new HelixException("Invalid job dag configuration!", ex);
+ }
+ }
+
public int getParallelJobs() {
return _record
.getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS);
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index cc21ce3..563e2e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -20,6 +20,10 @@ package org.apache.helix.task;
*/
import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
@@ -36,7 +40,9 @@ public class WorkflowContext extends HelixProperty {
JOB_STATES,
LAST_SCHEDULED_WORKFLOW,
SCHEDULED_WORKFLOWS,
+ LAST_PURGE_TIME
}
+
public static final int UNSTARTED = -1;
public static final int UNFINISHED = -1;
@@ -45,41 +51,48 @@ public class WorkflowContext extends HelixProperty {
}
public void setWorkflowState(TaskState s) {
- if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) {
+ String workflowState = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+ if (workflowState == null) {
_record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
- } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name())
- .equals(TaskState.FAILED.name()) && !_record
- .getSimpleField(WorkflowContextProperties.STATE.name())
+ } else if (!workflowState.equals(TaskState.FAILED.name()) && !workflowState
.equals(TaskState.COMPLETED.name())) {
_record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
}
}
public TaskState getWorkflowState() {
- String s = _record.getSimpleField(WorkflowContextProperties.STATE.name());
- if (s == null) {
- return null;
+ String state = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+ if (state == null) {
+ return TaskState.NOT_STARTED;
}
- return TaskState.valueOf(s);
+ return TaskState.valueOf(state);
}
- public void setJobState(String jobResource, TaskState s) {
+ public void setJobState(String job, TaskState s) {
Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
states = new TreeMap<>();
_record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
}
- states.put(jobResource, s.name());
+ states.put(job, s.name());
+ }
+
+ protected void removeJobStates(Set<String> jobs) {
+ Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+ if (states != null) {
+ states.keySet().removeAll(jobs);
+ _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+ }
}
- public TaskState getJobState(String jobResource) {
+ public TaskState getJobState(String job) {
Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
return null;
}
- String s = states.get(jobResource);
+ String s = states.get(job);
if (s == null) {
return null;
}
@@ -89,7 +102,8 @@ public class WorkflowContext extends HelixProperty {
public Map<String, TaskState> getJobStates() {
Map<String, TaskState> jobStates = new HashMap<>();
- Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+ Map<String, String> stateFieldMap =
+ _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (stateFieldMap != null) {
for (Map.Entry<String, String> state : stateFieldMap.entrySet()) {
jobStates.put(state.getKey(), TaskState.valueOf(state.getValue()));
@@ -131,17 +145,25 @@ public class WorkflowContext extends HelixProperty {
List<String> workflows = getScheduledWorkflows();
if (workflows == null) {
workflows = new ArrayList<>();
- _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
}
workflows.add(workflow);
+ _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
}
public String getLastScheduledSingleWorkflow() {
return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name());
}
+ protected void setLastJobPurgeTime(long epochTime) {
+ _record.setSimpleField(WorkflowContextProperties.LAST_PURGE_TIME.name(),
+ String.valueOf(epochTime));
+ }
+
+ public long getLastJobPurgeTime() {
+ return _record.getLongField(WorkflowContextProperties.LAST_PURGE_TIME.name(), -1);
+ }
+
public List<String> getScheduledWorkflows() {
return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name());
}
-
}