You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/09 18:58:33 UTC
[1/4] helix git commit: Persist preference list into IdealState in
full-auto mode and allow user to choose persisting either bestpossible or
intermediate state mapping into the mapfield of IS.
Repository: helix
Updated Branches:
refs/heads/master d2c3ebb48 -> 188969926
Persist preference list into IdealState in full-auto mode and allow user to choose persisting either bestpossible or intermediate state mapping into the mapfield of IS.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/18896992
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/18896992
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/18896992
Branch: refs/heads/master
Commit: 1889699269597dd5cb8b10e2eb90dcbcf6c64622
Parents: 0aeb557
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Feb 10 17:28:40 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700
----------------------------------------------------------------------
.../stages/BestPossibleStateCalcStage.java | 10 +++----
.../stages/PersistAssignmentStage.java | 2 +-
.../org/apache/helix/model/ClusterConfig.java | 5 +++-
.../java/org/apache/helix/model/IdealState.java | 29 +++++++++++---------
.../apache/helix/task/WorkflowRebalancer.java | 6 ----
.../TestRebalancerPersistAssignments.java | 2 ++
.../integration/task/TestJobQueueCleanUp.java | 4 +--
.../integration/task/TestRecurringJobQueue.java | 1 +
8 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 0a13a8d..5307e1a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -144,23 +144,23 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
if (rebalancer != null && mappingCalculator != null) {
-
if (rebalancer instanceof TaskRebalancer) {
TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
taskRebalancer.setClusterStatusMonitor(
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"));
}
-
try {
HelixManager manager = event.getAttribute("helixmanager");
rebalancer.init(manager);
- idealState = rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+ idealState =
+ rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+
output.setPreferenceLists(resourceName, idealState.getPreferenceLists());
// Use the internal MappingCalculator interface to compute the final assignment
// The next release will support rebalancers that compute the mapping from start to finish
- ResourceAssignment partitionStateAssignment =
- mappingCalculator.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
+ ResourceAssignment partitionStateAssignment = mappingCalculator
+ .computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
for (Partition partition : resource.getPartitions()) {
Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
output.setState(resourceName, partition, newStateMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index cd320a4..fd4c706 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.DataUpdater;
+
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
@@ -87,7 +88,6 @@ public class PersistAssignmentStage extends AbstractBaseStage {
needPersist = true;
}
}
-
PartitionStateMap partitionStateMap =
bestPossibleAssignment.getPartitionStateMap(resourceId);
if (clusterConfig.isPersistIntermediateAssignment()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 4b217cd..8ee3a79 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -79,7 +79,8 @@ public class ClusterConfig extends HelixProperty {
/**
* Enable/Disable persist best possible assignment in a resource's idealstate.
- *
+ * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and {@link #setPersistIntermediateAssignment(Boolean)}
+ * are set to true, the IntermediateAssignment will be persisted into IdealState's map field.
* @return
*/
public void setPersistBestPossibleAssignment(Boolean enable) {
@@ -104,6 +105,8 @@ public class ClusterConfig extends HelixProperty {
/**
* Enable/Disable persist IntermediateAssignment in a resource's idealstate.
+ * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and {@link #setPersistIntermediateAssignment(Boolean)}
+ * are set to true, the IntermediateAssignment will be persisted into IdealState's map field.
*
* @return
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 48e43d6..ccc8a0a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -351,9 +351,11 @@ public class IdealState extends HelixProperty {
}
/**
- * Get the current mapping of a partition
- * CAUTION: In FULL-AUTO mode, this method could return empty map if
- * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
+ * Get the current mapping of a partition.
+ *
+ * CAUTION: In FULL-AUTO mode, this method
+ * could return empty map if neither {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)}
+ * nor {@link ClusterConfig#setPersistIntermediateAssignment(Boolean)} is set to true.
*
* @param partitionName the name of the partition
* @return the instances where the replicas live and the state of each
@@ -375,25 +377,25 @@ public class IdealState extends HelixProperty {
/**
* Get the instances who host replicas of a partition.
- * CAUTION: In FULL-AUTO mode, this method could return empty map if
- * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
- +
- * @param partitionName the partition to look up
+ * CAUTION: In FULL-AUTO mode, this method
+ * could return empty set if neither {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)}
+ * nor {@link ClusterConfig#setPersistIntermediateAssignment(Boolean)} is set to true.
+ *
* @return set of instance names
*/
public Set<String> getInstanceSet(String partitionName) {
- switch (getRebalanceMode()) {
+ switch(getRebalanceMode()) {
case FULL_AUTO:
case SEMI_AUTO:
case USER_DEFINED:
case TASK:
List<String> prefList = _record.getListField(partitionName);
if (prefList != null && !prefList.isEmpty()) {
- return new TreeSet<String>(prefList);
+ return new TreeSet<>(prefList);
} else {
Map<String, String> stateMap = _record.getMapField(partitionName);
if (stateMap != null && !stateMap.isEmpty()) {
- return new TreeSet<String>(stateMap.keySet());
+ return new TreeSet<>(stateMap.keySet());
} else {
logger.warn(partitionName + " does NOT exist");
}
@@ -402,21 +404,22 @@ public class IdealState extends HelixProperty {
case CUSTOMIZED:
Map<String, String> stateMap = _record.getMapField(partitionName);
if (stateMap != null) {
- return new TreeSet<String>(stateMap.keySet());
+ return new TreeSet<>(stateMap.keySet());
} else {
logger.warn(partitionName + " does NOT exist");
}
break;
case NONE:
default:
- logger.error("Invalid ideal state mode: " + getResourceName());
+ logger.warn("Invalid ideal state mode: " + getResourceName());
break;
}
return Collections.emptySet();
}
- /** Set the preference list of a partition
+ /**
+ * Set the preference list of a partition
* @param partitionName the name of the partition
* @param instanceList the instance preference list
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index ac2ac87..6b95658 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -443,14 +443,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
-<<<<<<< HEAD
- * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
- * contexts associated with this workflow, and all jobs information, including their configs,
- * context, IS and EV.
-=======
* Cleans up workflow configs and workflow contexts associated with this workflow, including all
* job-level configs and context, plus workflow-level information.
->>>>>>> Support configurable job purge interval for a queue.
*/
private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
LOG.info("Cleaning up workflow: " + workflow);
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index 2a9dc69..01aec14 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -24,8 +24,10 @@ import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index 6eecf20..018d071 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -86,11 +86,11 @@ public class TestJobQueueCleanUp extends TaskTestBase {
String queueName = TestHelper.getTestMethodName();
JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
- cfgBuilder.setJobPurgeInterval(1000);
+ cfgBuilder.setJobPurgeInterval(500);
builder.setWorkflowConfig(cfgBuilder.build());
JobConfig.Builder jobBuilder =
- new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ new JobConfig.Builder().setNumberOfTasks(1)
.setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, String.valueOf(capacity / 2)))
.setExpiry(200L);
http://git-wip-us.apache.org/repos/asf/helix/blob/18896992/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index a1070d8..8253e18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -276,6 +276,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
// Record all scheduled workflows
wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
final List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows());
+ Assert.assertFalse(scheduledWorkflows.size() > 2);
// Delete recurrent workflow
_driver.delete(queueName);
[4/4] helix git commit: Clean up jobs in a jobqueue automatically
after the job completes and passes its expiry time.
Posted by jx...@apache.org.
Clean up jobs in a jobqueue automatically after the job completes and passes its expiry time.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e530bf51
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e530bf51
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e530bf51
Branch: refs/heads/master
Commit: e530bf5183e7ad2f3a27d0e75448b88e8554efe8
Parents: d2c3ebb
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jan 30 15:02:24 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700
----------------------------------------------------------------------
.../webapp/resources/JobQueueResource.java | 2 +-
.../helix/task/DeprecatedTaskRebalancer.java | 4 +-
.../main/java/org/apache/helix/task/JobDag.java | 48 +-
.../org/apache/helix/task/JobRebalancer.java | 22 +-
.../java/org/apache/helix/task/TaskDriver.java | 437 ++++++-----------
.../org/apache/helix/task/TaskRebalancer.java | 46 +-
.../org/apache/helix/task/TaskStateModel.java | 2 +-
.../java/org/apache/helix/task/TaskUtil.java | 486 ++++++++++++++++---
.../org/apache/helix/task/WorkflowConfig.java | 8 +
.../org/apache/helix/task/WorkflowContext.java | 52 +-
.../apache/helix/task/WorkflowRebalancer.java | 193 +++-----
.../java/org/apache/helix/tools/TaskAdmin.java | 2 +-
.../helix/integration/task/TaskTestUtil.java | 14 +-
.../integration/task/TestJobQueueCleanUp.java | 4 +-
.../apache/helix/task/TestCleanExpiredJobs.java | 100 +++-
15 files changed, 838 insertions(+), 582 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index 32b782d..3df78f2 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -180,7 +180,7 @@ public class JobQueueResource extends ServerResource {
break;
}
case clean: {
- driver.cleanupJobQueue(jobQueueName);
+ driver.cleanupQueue(jobQueueName);
break;
}
default:
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 14c559c..8624398 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -125,7 +125,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
LOG.debug("Computer Best Partition for resource: " + resourceName);
// Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+ JobConfig jobCfg = TaskUtil.getJobConfig(_manager, resourceName);
if (jobCfg == null) {
LOG.debug("Job configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
@@ -133,7 +133,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
if (workflowCfg == null) {
LOG.debug("Workflow configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 32e1ffa..98a8c39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -34,6 +35,8 @@ import org.codehaus.jackson.map.ObjectMapper;
* and validate a job dependency graph
*/
public class JobDag {
+ private static final Logger LOG = Logger.getLogger(JobDag.class);
+
@JsonProperty("parentsToChildren")
private Map<String, Set<String>> _parentsToChildren;
@@ -66,7 +69,7 @@ public class JobDag {
_allNodes.add(child);
}
- public void removeParentToChild(String parent, String child) {
+ private void removeParentToChild(String parent, String child) {
if (_parentsToChildren.containsKey(parent)) {
Set<String> children = _parentsToChildren.get(parent);
children.remove(child);
@@ -91,7 +94,7 @@ public class JobDag {
/**
* must make sure no other node dependence before removing the node
*/
- public void removeNode(String node) {
+ private void removeNode(String node) {
if (_parentsToChildren.containsKey(node) || _childrenToParents.containsKey(node)) {
throw new IllegalStateException(
"The node is either a parent or a child of other node, could not be deleted");
@@ -100,6 +103,47 @@ public class JobDag {
_allNodes.remove(node);
}
+ /**
+ * Remove a node from the DAG.
+ * @param job
+ * @param maintainDependency: if true, the removed job's parent and child node will be linked together,
+ * otherwise, the job will be removed directly without modifying the existing dependency links.
+ */
+ public void removeNode(String job, boolean maintainDependency) {
+ if (!_allNodes.contains(job)) {
+ LOG.info("Could not delete job " + job + " from DAG, node does not exist");
+ return;
+ }
+ if (maintainDependency) {
+ String parent = null;
+ String child = null;
+ // remove the node from the queue
+ for (String n : _allNodes) {
+ if (getDirectChildren(n).contains(job)) {
+ parent = n;
+ removeParentToChild(parent, job);
+ } else if (getDirectParents(n).contains(job)) {
+ child = n;
+ removeParentToChild(job, child);
+ }
+ }
+ if (parent != null && child != null) {
+ addParentToChild(parent, child);
+ }
+ removeNode(job);
+ } else {
+ for (String child : getDirectChildren(job)) {
+ getChildrenToParents().get(child).remove(job);
+ }
+ for (String parent : getDirectParents(job)) {
+ getParentsToChildren().get(parent).remove(job);
+ }
+ _childrenToParents.remove(job);
+ _parentsToChildren.remove(job);
+ removeNode(job);
+ }
+ }
+
public Map<String, Set<String>> getParentsToChildren() {
return _parentsToChildren;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index c8deb35..d9093b9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -33,6 +33,7 @@ import java.util.TreeSet;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
@@ -69,7 +70,7 @@ public class JobRebalancer extends TaskRebalancer {
LOG.debug("Computer Best Partition for job: " + jobName);
// Fetch job configuration
- JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
+ JobConfig jobCfg = TaskUtil.getJobConfig(_manager, jobName);
if (jobCfg == null) {
LOG.error("Job configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
@@ -77,7 +78,7 @@ public class JobRebalancer extends TaskRebalancer {
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflowResource);
if (workflowCfg == null) {
LOG.error("Workflow configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
@@ -105,7 +106,7 @@ public class JobRebalancer extends TaskRebalancer {
LOG.info(String.format(
"Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
workflowResource, jobName, workflowState, jobState));
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+ TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
_scheduledRebalancer.removeScheduledRebalance(jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
@@ -358,7 +359,7 @@ public class JobRebalancer extends TaskRebalancer {
addAllPartitions(allPartitions, partitionsToDropFromIs);
// remove IdealState of this job
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+ TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
return buildEmptyAssignment(jobResource, currStateOutput);
} else {
skippedPartitions.add(pId);
@@ -397,7 +398,7 @@ public class JobRebalancer extends TaskRebalancer {
markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
// remove IdealState of this job
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+ TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
}
// Make additional task assignments if needed.
@@ -693,7 +694,7 @@ public class JobRebalancer extends TaskRebalancer {
}
for (Partition partition : assignment.getMappedPartitions()) {
- int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+ int pId = getPartitionId(partition.getPartitionName());
if (includeSet.contains(pId)) {
Map<String, String> replicaMap = assignment.getReplicaMap(partition);
for (String instance : replicaMap.keySet()) {
@@ -707,6 +708,15 @@ public class JobRebalancer extends TaskRebalancer {
return result;
}
+ /* Extracts the partition id from the given partition name. */
+ private static int getPartitionId(String pName) {
+ int index = pName.lastIndexOf("_");
+ if (index == -1) {
+ throw new HelixException("Invalid partition name " + pName);
+ }
+ return Integer.valueOf(pName.substring(index + 1));
+ }
+
private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
Set<Integer> nonReadyPartitions = Sets.newHashSet();
for (int p : ctx.getPartitionSet()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index df5cdf6..15c906a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -88,7 +88,7 @@ public class TaskDriver {
public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
- new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+ new ZkHelixPropertyStore<>(baseAccessor,
PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
}
@@ -136,7 +136,7 @@ public class TaskDriver {
newWorkflowConfig.setJobTypes(jobTypes);
// add workflow config.
- if (!TaskUtil.setResourceConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+ if (!TaskUtil.setWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
}
@@ -171,7 +171,7 @@ public class TaskDriver {
*/
public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
WorkflowConfig currentConfig =
- TaskUtil.getWorkflowCfg(_accessor, workflow);
+ TaskUtil.getWorkflowConfig(_accessor, workflow);
if (currentConfig == null) {
throw new HelixException("Workflow " + workflow + " does not exist!");
}
@@ -181,7 +181,9 @@ public class TaskDriver {
"Workflow " + workflow + " is terminable, not allow to change its configuration!");
}
- if (!TaskUtil.setResourceConfig(_accessor, workflow, newWorkflowConfig)) {
+ // Should not let user changing DAG in the workflow
+ newWorkflowConfig.setJobDag(currentConfig.getJobDag());
+ if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
LOG.error("Failed to update workflow configuration for workflow " + workflow);
}
@@ -198,283 +200,124 @@ public class TaskDriver {
}
/**
- * Remove all jobs in a job queue
+ * Remove all completed or failed jobs in a job queue
+ * Same as {@link #cleanupQueue(String)}
*
- * @param queueName
+ * @param queue name of the queue
* @throws Exception
*/
- // TODO: need to make sure the queue is stopped or completed before flush the queue.
- public void flushQueue(String queueName) {
- WorkflowConfig config =
- TaskUtil.getWorkflowCfg(_accessor, queueName);
- if (config == null) {
- throw new IllegalArgumentException("Queue does not exist!");
- }
-
- // Remove all ideal states and resource configs to trigger a drop event
- PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
- final Set<String> toRemove = Sets.newHashSet(config.getJobDag().getAllNodes());
- for (String resourceName : toRemove) {
- _accessor.removeProperty(keyBuilder.idealStates(resourceName));
- _accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
- // Delete context
- String contextKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
- _propertyStore.remove(contextKey, AccessOption.PERSISTENT);
- }
-
- // Now atomically clear the DAG
- String path = keyBuilder.resourceConfig(queueName).getPath();
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- for (String resourceName : toRemove) {
- for (String child : jobDag.getDirectChildren(resourceName)) {
- jobDag.getChildrenToParents().get(child).remove(resourceName);
- }
- for (String parent : jobDag.getDirectParents(resourceName)) {
- jobDag.getParentsToChildren().get(parent).remove(resourceName);
- }
- jobDag.getChildrenToParents().remove(resourceName);
- jobDag.getParentsToChildren().remove(resourceName);
- jobDag.getAllNodes().remove(resourceName);
- }
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
- }
- return currentData;
- }
- };
- _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
-
- // Now atomically clear the results
- path = Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
- updater = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- Map<String, String> states =
- currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
- if (states != null) {
- states.keySet().removeAll(toRemove);
- }
- return currentData;
- }
- };
- _propertyStore.update(path, updater, AccessOption.PERSISTENT);
+ public void flushQueue(String queue) {
+ cleanupQueue(queue);
}
/**
* Delete a job from an existing named queue,
* the queue has to be stopped prior to this call
*
- * @param queueName
- * @param jobName
+ * @param queue queue name
+ * @param job job name
*/
- public void deleteJob(final String queueName, final String jobName) {
+ public void deleteJob(final String queue, final String job) {
WorkflowConfig workflowCfg =
- TaskUtil.getWorkflowCfg(_accessor, queueName);
+ TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowCfg == null) {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
}
if (workflowCfg.isTerminable()) {
- throw new IllegalArgumentException(queueName + " is not a queue!");
+ throw new IllegalArgumentException(queue + " is not a queue!");
}
boolean isRecurringWorkflow =
(workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
if (isRecurringWorkflow) {
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
-
- String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-
- // delete the current scheduled one
- deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true);
-
- // Remove the job from the original queue template's DAG
- removeJobFromDag(queueName, jobName);
-
- // delete the ideal state and resource config for the template job
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- _admin.dropResource(_clusterName, namespacedJobName);
-
- // Delete the job template from property store
- String jobPropertyPath =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
- _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
- } else {
- deleteJobFromScheduledQueue(queueName, jobName, false);
- }
- }
-
- /**
- * delete a job from a scheduled (non-recurrent) queue.
- *
- * @param queueName
- * @param jobName
- */
- private void deleteJobFromScheduledQueue(final String queueName, final String jobName,
- boolean isRecurrent) {
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_accessor, queueName);
-
- if (workflowCfg == null) {
- // When try to delete recurrent job, it could be either not started or finished. So
- // there may not be a workflow config.
- if (isRecurrent) {
- return;
- } else {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ // delete job from the last scheduled queue if there exists one.
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ String lastScheduledQueue = null;
+ if (wCtx != null) {
+ lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ }
+ if (lastScheduledQueue != null) {
+ WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue);
+ if (lastWorkflowCfg != null) {
+ deleteJobFromQueue(lastScheduledQueue, job);
+ }
}
}
+ deleteJobFromQueue(queue, job);
+ }
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
- if (wCtx != null && wCtx.getWorkflowState() == null) {
- throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
- }
-
- String workflowState =
- (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+ private void deleteJobFromQueue(final String queue, final String job) {
+ WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ String workflowState = (workflowCtx != null)
+ ? workflowCtx.getWorkflowState().name()
+ : TaskState.NOT_STARTED.name();
if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
- throw new IllegalStateException("Queue " + queueName + " is still in progress!");
+ throw new IllegalStateException("Queue " + queue + " is still running!");
}
- removeJob(queueName, jobName);
- }
-
- private void removeJob(String queueName, String jobName) {
- // Remove the job from the queue in the DAG
- removeJobFromDag(queueName, jobName);
-
- // delete the ideal state and resource config for the job
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- _admin.dropResource(_clusterName, namespacedJobName);
-
- // update queue's property to remove job from JOB_STATES if it is already started.
- removeJobStateFromQueue(queueName, jobName);
-
- // Delete the job from property store
- TaskUtil.removeJobContext(_propertyStore, namespacedJobName);
- }
-
- /** Remove the job name from the DAG from the queue configuration */
- private void removeJobFromDag(final String queueName, final String jobName) {
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
-
- DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
- return null;
- }
- // Add the node to the existing DAG
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- Set<String> allNodes = jobDag.getAllNodes();
- if (!allNodes.contains(namespacedJobName)) {
- LOG.warn(
- "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
- return currentData;
- }
- String parent = null;
- String child = null;
- // remove the node from the queue
- for (String node : allNodes) {
- if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
- parent = node;
- jobDag.removeParentToChild(parent, namespacedJobName);
- } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
- child = node;
- jobDag.removeParentToChild(namespacedJobName, child);
- }
- }
- if (parent != null && child != null) {
- jobDag.addParentToChild(parent, child);
- }
- jobDag.removeNode(namespacedJobName);
-
- // Save the updated DAG
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- throw new IllegalStateException(
- "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
- }
- return currentData;
- }
- };
-
- String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
- if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
- throw new IllegalArgumentException(
- "Could not remove job " + jobName + " from DAG of queue " + queueName);
+ if (workflowState.equals(TaskState.COMPLETED.name()) || workflowState.equals(
+ TaskState.FAILED.name()) || workflowState.equals(TaskState.ABORTED.name())) {
+ LOG.warn("Queue " + queue + " has already reached its final state, skip deleting job from it.");
+ return;
}
- }
-
- /** update queue's property to remove job from JOB_STATES if it is already started. */
- private void removeJobStateFromQueue(final String queueName, final String jobName) {
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
- String queuePropertyPath =
- Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- Map<String, String> states =
- currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name());
- if (states != null && states.containsKey(namespacedJobName)) {
- states.keySet().remove(namespacedJobName);
- }
- }
- return currentData;
- }
- };
- if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) {
- LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName);
+ String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+ Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName));
+ if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) {
+ LOG.error("Failed to delete job " + job + " from queue " + queue);
+ throw new HelixException("Failed to delete job " + job + " from queue " + queue);
}
}
/**
* Adds a new job to the end an existing named queue.
*
- * @param queueName
- * @param jobName
+ * @param queue
+ * @param job
* @param jobBuilder
* @throws Exception
*/
- public void enqueueJob(final String queueName, final String jobName,
+ public void enqueueJob(final String queue, final String job,
JobConfig.Builder jobBuilder) {
// Get the job queue config and capacity
- WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_accessor, queueName);
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
- throw new IllegalArgumentException("Queue " + queueName + " config does not yet exist!");
+ throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
}
- boolean isTerminable = workflowConfig.isTerminable();
- if (isTerminable) {
- throw new IllegalArgumentException(queueName + " is not a queue!");
+ if (workflowConfig.isTerminable()) {
+ throw new IllegalArgumentException(queue + " is not a queue!");
}
final int capacity = workflowConfig.getCapacity();
+ int queueSize = workflowConfig.getJobDag().size();
+ if (capacity > 0 && queueSize >= capacity) {
+ // if queue is full, Helix will try to clean up the expired job to free more space.
+ WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ if (workflowContext != null) {
+ Set<String> expiredJobs =
+ TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
+ if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
+ LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
+ }
+ }
+ workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
+ if (workflowConfig.getJobDag().size() >= capacity) {
+ throw new HelixException("Failed to enqueue a job, queue is full.");
+ }
+ }
// Create the job to ensure that it validates
- JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
-
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+ JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
+ final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
// add job config first.
addJobConfig(namespacedJobName, jobConfig);
final String jobType = jobConfig.getJobType();
- // Add the job to the end of the queue in the DAG
+ // update the job dag to append the job to the end of the queue.
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
@@ -484,11 +327,11 @@ public class TaskDriver {
Set<String> allNodes = jobDag.getAllNodes();
if (capacity > 0 && allNodes.size() >= capacity) {
throw new IllegalStateException(
- "Queue " + queueName + " is at capacity, will not add " + jobName);
+ "Queue " + queue + " already reaches its max capacity, failed to add " + job);
}
if (allNodes.contains(namespacedJobName)) {
throw new IllegalStateException(
- "Could not add to queue " + queueName + ", job " + jobName + " already exists");
+ "Could not add to queue " + queue + ", job " + job + " already exists");
}
jobDag.addNode(namespacedJobName);
@@ -511,7 +354,7 @@ public class TaskDriver {
if (jobTypes == null) {
jobTypes = new HashMap<String, String>();
}
- jobTypes.put(jobName, jobType);
+ jobTypes.put(queue, jobType);
currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
}
@@ -520,51 +363,58 @@ public class TaskDriver {
currentData
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
- throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
+ throw new IllegalStateException("Could not add job " + job + " to queue " + queue,
e);
}
return currentData;
}
};
- String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
+ String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
if (!status) {
- throw new IllegalArgumentException("Could not enqueue job");
+ throw new HelixException("Failed to enqueue job");
}
// This is to make it back-compatible with old Helix task driver.
- addWorkflowResourceIfNecessary(queueName);
+ addWorkflowResourceIfNecessary(queue);
// Schedule the job
- RebalanceScheduler.invokeRebalance(_accessor, queueName);
+ RebalanceScheduler.invokeRebalance(_accessor, queue);
}
/**
- * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue.
- * The job config, job context will be removed from Zookeeper.
+ * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. The
+ * job config, job context will be removed from Zookeeper.
*
- * @param queueName The name of job queue
+ * @param queue The name of job queue
*/
- public void cleanupJobQueue(String queueName) {
- WorkflowConfig workflowCfg =
- TaskUtil.getWorkflowCfg(_accessor, queueName);
+ public void cleanupQueue(String queue) {
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
- if (workflowCfg == null) {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ if (workflowConfig == null) {
+ throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
+ }
+
+ boolean isTerminable = workflowConfig.isTerminable();
+ if (isTerminable) {
+ throw new IllegalArgumentException(queue + " is not a queue!");
}
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
- if (wCtx != null && wCtx.getWorkflowState() == null) {
- throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!");
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
+ if (wCtx == null || wCtx.getWorkflowState() == null) {
+ throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
}
- for (String jobNode : workflowCfg.getJobDag().getAllNodes()) {
+ Set<String> jobs = new HashSet<String>();
+ for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
TaskState curState = wCtx.getJobState(jobNode);
if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
|| curState == TaskState.FAILED)) {
- removeJob(queueName, TaskUtil.getDenamespacedJobName(queueName, jobNode));
+ jobs.add(jobNode);
}
}
+
+ TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true);
}
/** Posts new workflow resource to cluster */
@@ -577,7 +427,6 @@ public class TaskDriver {
.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
_admin.setResourceIdealState(_clusterName, workflow, is);
-
}
/**
@@ -605,13 +454,13 @@ public class TaskDriver {
/**
* Add new job config to cluster
*/
- private void addJobConfig(String jobName, JobConfig jobConfig) {
- LOG.info("Add job configuration " + jobName);
+ private void addJobConfig(String job, JobConfig jobConfig) {
+ LOG.info("Add job configuration " + job);
// Set the job configuration
- JobConfig newJobCfg = new JobConfig(jobName, jobConfig);
- if (!TaskUtil.setResourceConfig(_accessor, jobName, newJobCfg)) {
- LOG.error("Failed to add job configuration for job " + jobName);
+ JobConfig newJobCfg = new JobConfig(job, jobConfig);
+ if (!TaskUtil.setJobConfig(_accessor, job, newJobCfg)) {
+ throw new HelixException("Failed to add job configuration for job " + job);
}
}
@@ -691,14 +540,15 @@ public class TaskDriver {
/**
* Helper function to change target state for a given workflow
*/
- private void setWorkflowTargetState(String workflowName, TargetState state) {
- setSingleWorkflowTargetState(workflowName, state);
-
- // TODO: just need to change the lastScheduledWorkflow.
- List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
- for (String resource : resources) {
- if (resource.startsWith(workflowName)) {
- setSingleWorkflowTargetState(resource, state);
+ private void setWorkflowTargetState(String workflow, TargetState state) {
+ setSingleWorkflowTargetState(workflow, state);
+
+ // For recurring schedules, last scheduled incomplete workflow must also be handled
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+ if (wCtx != null) {
+ String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+ if (lastScheduledWorkflow != null) {
+ setSingleWorkflowTargetState(lastScheduledWorkflow, state);
}
}
}
@@ -706,42 +556,47 @@ public class TaskDriver {
/**
* Helper function to change target state for a given workflow
*/
- private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
- LOG.info("Set " + workflowName + " to target state " + state);
+ private void setSingleWorkflowTargetState(String workflow, final TargetState state) {
+ LOG.info("Set " + workflow + " to target state " + state);
+
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
+ if (workflowConfig == null) {
+ LOG.warn("WorkflowConfig for " + workflow + " not found!");
+ return;
+ }
+
+ WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+ if (state != TargetState.DELETE && workflowContext != null &&
+ (workflowContext.getFinishTime() != WorkflowContext.UNFINISHED
+ || workflowContext.getWorkflowState() == TaskState.COMPLETED
+ || workflowContext.getWorkflowState() == TaskState.FAILED)) {
+ // Should not update target state for completed workflow
+ LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
+ + state);
+ return;
+ }
+
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override public ZNRecord update(ZNRecord currentData) {
if (currentData != null) {
- // Only update target state for non-completed workflows
- String finishTime = currentData
- .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name());
- if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
- currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
- state.name());
- } else {
- LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
- }
+ currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+ state.name());
} else {
- LOG.error("TargetState DataUpdater: Fails to update target state " + currentData);
+ LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is "
+ + currentData);
}
return currentData;
}
};
- List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
- List<String> paths = Lists.newArrayList();
-
- PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
- if (_accessor.getProperty(cfgKey) != null) {
- paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
- updaters.add(updater);
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- RebalanceScheduler.invokeRebalance(_accessor, workflowName);
- } else {
- LOG.error("Configuration path " + cfgKey + " not found!");
- }
+
+ PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
+ _accessor.getBaseDataAccessor()
+ .update(workflowConfigKey.getPath(), updater, AccessOption.PERSISTENT);
+ RebalanceScheduler.invokeRebalance(_accessor, workflow);
}
public WorkflowConfig getWorkflowConfig(String workflow) {
- return TaskUtil.getWorkflowCfg(_accessor, workflow);
+ return TaskUtil.getWorkflowConfig(_accessor, workflow);
}
public WorkflowContext getWorkflowContext(String workflow) {
@@ -749,7 +604,7 @@ public class TaskDriver {
}
public JobConfig getJobConfig(String job) {
- return TaskUtil.getJobCfg(_accessor, job);
+ return TaskUtil.getJobConfig(_accessor, job);
}
public JobContext getJobContext(String job) {
@@ -761,7 +616,7 @@ public class TaskDriver {
}
public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
- return TaskUtil.getWorkflowCfg(manager, workflow);
+ return TaskUtil.getWorkflowConfig(manager, workflow);
}
public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
@@ -769,7 +624,7 @@ public class TaskDriver {
}
public static JobConfig getJobConfig(HelixManager manager, String job) {
- return TaskUtil.getJobCfg(manager, job);
+ return TaskUtil.getJobConfig(manager, job);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 312c499..20a9233 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -23,10 +23,8 @@ import java.util.Date;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -47,6 +45,7 @@ import com.google.common.collect.Maps;
public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
public static final String START_TIME_KEY = "StartTime";
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+ protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
// For connection management
protected HelixManager _manager;
@@ -84,7 +83,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
ctx.setJobState(jobToFail, TaskState.ABORTED);
_clusterStatusMonitor
- .updateJobCounters(TaskUtil.getJobCfg(_manager, jobToFail), TaskState.ABORTED);
+ .updateJobCounters(TaskUtil.getJobConfig(_manager, jobToFail), TaskState.ABORTED);
}
}
return true;
@@ -171,7 +170,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// If there is parent job failed, schedule the job only when ignore dependent
// job failure enabled
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
markJobFailed(job, null, workflowCfg, workflowCtx);
LOG.debug(
@@ -239,7 +238,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
protected void scheduleJobCleanUp(String jobName, WorkflowConfig workflowConfig,
long currentTime) {
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_manager, jobName);
long currentScheduledTime =
_scheduledRebalancer.getRebalanceTime(workflowConfig.getWorkflowId()) == -1
? Long.MAX_VALUE
@@ -262,41 +261,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
}
- /**
- * Cleans up IdealState and external view associated with a job/workflow resource.
- */
- protected static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
- LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
-
- // Delete the ideal state itself.
- PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
- if (accessor.getProperty(isKey) != null) {
- if (!accessor.removeProperty(isKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
- resourceName, isKey));
- }
- } else {
- LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
- }
-
- // Delete dead external view
- // because job is already completed, there is no more current state change
- // thus dead external views removal will not be triggered
- PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
- if (accessor.getProperty(evKey) != null) {
- if (!accessor.removeProperty(evKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
- resourceName, evKey));
- }
- }
-
- LOG.info(String
- .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
- }
-
- @Override public IdealState computeNewIdealState(String resourceName,
+ @Override
+ public IdealState computeNewIdealState(String resourceName,
IdealState currentIdealState, CurrentStateOutput currentStateOutput,
ClusterDataCache clusterData) {
// All of the heavy lifting is in the ResourceAssignment computation,
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a7c58d2..61e0394 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -268,7 +268,7 @@ public class TaskStateModel extends StateModel {
}
private void startTask(Message msg, String taskPartition) {
- JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+ JobConfig cfg = TaskUtil.getJobConfig(_manager, msg.getResourceName());
TaskConfig taskConfig = null;
String command = cfg.getCommand();
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index effdd44..f064bbf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,10 +20,13 @@ package org.apache.helix.task;
*/
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
@@ -38,6 +41,7 @@ import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -59,12 +63,12 @@ public class TaskUtil {
* This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
*
* @param accessor Accessor to access Helix configs
- * @param jobResource The name of the job resource
+ * @param job The name of the job resource
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
* otherwise.
*/
- protected static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
- HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
+ protected static JobConfig getJobConfig(HelixDataAccessor accessor, String job) {
+ HelixProperty jobResourceConfig = getResourceConfig(accessor, job);
if (jobResourceConfig == null) {
return null;
}
@@ -76,12 +80,38 @@ public class TaskUtil {
* This method is internal API, please use the corresponding one in TaskDriver.getJobConfig();
*
* @param manager HelixManager object used to connect to Helix.
- * @param jobResource The name of the job resource.
+ * @param job The name of the job resource.
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
* otherwise.
*/
- protected static JobConfig getJobCfg(HelixManager manager, String jobResource) {
- return getJobCfg(manager.getHelixDataAccessor(), jobResource);
+ protected static JobConfig getJobConfig(HelixManager manager, String job) {
+ return getJobConfig(manager.getHelixDataAccessor(), job);
+ }
+
+ /**
+ * Set the job config
+ *
+ * @param accessor Accessor to Helix configs
+ * @param job The job name
+ * @param jobConfig The job config to be set
+ *
+ * @return True if set successfully, otherwise false
+ */
+ protected static boolean setJobConfig(HelixDataAccessor accessor, String job,
+ JobConfig jobConfig) {
+ return setResourceConfig(accessor, job, jobConfig);
+ }
+
+ /**
+ * Remove a job config.
+ *
+ * @param accessor
+ * @param job
+ *
+ * @return
+ */
+ protected static boolean removeJobConfig(HelixDataAccessor accessor, String job) {
+ return removeWorkflowJobConfig(accessor, job);
}
/**
@@ -93,7 +123,7 @@ public class TaskUtil {
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- protected static WorkflowConfig getWorkflowCfg(HelixDataAccessor accessor, String workflow) {
+ protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor accessor, String workflow) {
HelixProperty workflowCfg = getResourceConfig(accessor, workflow);
if (workflowCfg == null) {
return null;
@@ -111,21 +141,30 @@ public class TaskUtil {
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- protected static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflow) {
- return getWorkflowCfg(manager.getHelixDataAccessor(), workflow);
+ protected static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
+ return getWorkflowConfig(manager.getHelixDataAccessor(), workflow);
}
/**
- * Set the resource config
+ * Set the workflow config
* @param accessor Accessor to Helix configs
- * @param resource The resource name
- * @param resourceConfig The resource config to be set
+ * @param workflow The workflow name
+ * @param workflowConfig The workflow config to be set
* @return True if set successfully, otherwise false
*/
- protected static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
- ResourceConfig resourceConfig) {
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+ protected static boolean setWorkflowConfig(HelixDataAccessor accessor, String workflow,
+ WorkflowConfig workflowConfig) {
+ return setResourceConfig(accessor, workflow, workflowConfig);
+ }
+
+ /**
+ * Remove a workflow config.
+ * @param accessor
+ * @param workflow
+ * @return
+ */
+ protected static boolean removeWorkflowConfig(HelixDataAccessor accessor, String workflow) {
+ return removeWorkflowJobConfig(accessor, workflow);
}
/**
@@ -199,14 +238,12 @@ public class TaskUtil {
* This method is internal API.
*
* @param propertyStore Property store for the cluster
- * @param jobResource The name of the job
+ * @param job The name of the job
* @return True if remove success, otherwise false
*/
protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
- String jobResource) {
- return propertyStore.remove(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
- AccessOption.PERSISTENT);
+ String job) {
+ return removeWorkflowJobContext(propertyStore, job);
}
/**
@@ -230,25 +267,25 @@ public class TaskUtil {
* This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
*
* @param manager a connection to Helix
- * @param workflowResource the name of the workflow
+ * @param workflow the name of the workflow
* @return the {@link WorkflowContext}, or null if none is available
*/
- protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
- return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+ protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
+ return getWorkflowContext(manager.getHelixPropertyStore(), workflow);
}
/**
* Set the runtime context of a single workflow
*
* @param manager a connection to Helix
- * @param workflowResource the name of the workflow
- * @param ctx the up-to-date {@link WorkflowContext} for the workflow
+ * @param workflow the name of the workflow
+ * @param workflowContext the up-to-date {@link WorkflowContext} for the workflow
*/
- protected static void setWorkflowContext(HelixManager manager, String workflowResource,
- WorkflowContext ctx) {
+ protected static void setWorkflowContext(HelixManager manager, String workflow,
+ WorkflowContext workflowContext) {
manager.getHelixPropertyStore().set(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
- ctx.getRecord(), AccessOption.PERSISTENT);
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE),
+ workflowContext.getRecord(), AccessOption.PERSISTENT);
}
/**
@@ -256,11 +293,11 @@ public class TaskUtil {
* This method is internal API.
*
* @param manager A connection to Helix
- * @param workflowResource The name of the workflow
+ * @param workflow The name of the workflow
* @return True if remove success, otherwise false
*/
- protected static boolean removeWorkflowContext(HelixManager manager, String workflowResource) {
- return removeWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+ protected static boolean removeWorkflowContext(HelixManager manager, String workflow) {
+ return removeWorkflowContext(manager.getHelixPropertyStore(), workflow);
}
/**
@@ -268,14 +305,12 @@ public class TaskUtil {
* This method is internal API.
*
* @param propertyStore Property store for the cluster
- * @param workflowResource The name of the workflow
+ * @param workflow The name of the workflow
* @return True if remove success, otherwise false
*/
protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
- String workflowResource) {
- return propertyStore.remove(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource),
- AccessOption.PERSISTENT);
+ String workflow) {
+ return removeWorkflowJobContext(propertyStore, workflow);
}
/**
@@ -303,8 +338,8 @@ public class TaskUtil {
protected static String getWorkflowJobUserContent(HelixManager manager,
String workflowJobResource, String key) {
ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
- .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null,
- AccessOption.PERSISTENT);
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE),
+ null, AccessOption.PERSISTENT);
return r != null ? r.getSimpleField(key) : null;
}
@@ -333,19 +368,19 @@ public class TaskUtil {
* Get user defined task level key-value pair data
*
* @param manager a connection to Helix
- * @param jobResource the name of job
- * @param taskResource the name of the task
+ * @param job the name of job
+ * @param task the name of the task
* @param key the key of key-value pair
*
* @return null if there is no such pair, otherwise return a String
*/
- protected static String getTaskUserContent(HelixManager manager, String jobResource,
- String taskResource, String key) {
+ protected static String getTaskUserContent(HelixManager manager, String job,
+ String task, String key) {
ZNRecord r = manager.getHelixPropertyStore().get(
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE),
- null, AccessOption.PERSISTENT);
- return r != null ? (r.getMapField(taskResource) != null
- ? r.getMapField(taskResource).get(key)
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null,
+ AccessOption.PERSISTENT);
+ return r != null ? (r.getMapField(task) != null
+ ? r.getMapField(task).get(key)
: null) : null;
}
@@ -353,22 +388,22 @@ public class TaskUtil {
* Add an user defined key-value pair data to task level
*
* @param manager a connection to Helix
- * @param jobResource the name of job
- * @param taskResource the name of task
+ * @param job the name of job
+ * @param task the name of task
* @param key the key of key-value pair
* @param value the value of key-value pair
*/
- protected static void addTaskUserContent(final HelixManager manager, String jobResource,
- final String taskResource, final String key, final String value) {
+ protected static void addTaskUserContent(final HelixManager manager, String job,
+ final String task, final String key, final String value) {
String path =
- Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE);
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE);
manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
@Override public ZNRecord update(ZNRecord znRecord) {
- if (znRecord.getMapField(taskResource) == null) {
- znRecord.setMapField(taskResource, new HashMap<String, String>());
+ if (znRecord.getMapField(task) == null) {
+ znRecord.setMapField(task, new HashMap<String, String>());
}
- znRecord.getMapField(taskResource).put(key, value);
+ znRecord.getMapField(task).put(key, value);
return znRecord;
}
}, AccessOption.PERSISTENT);
@@ -386,25 +421,25 @@ public class TaskUtil {
/**
* Get a workflow-qualified job name for a job in that workflow
*
- * @param workflowResource the name of the workflow
+ * @param workflow the name of the workflow
* @param jobName the un-namespaced name of the job
* @return The namespaced job name, which is just workflowResource_jobName
*/
- public static String getNamespacedJobName(String workflowResource, String jobName) {
- return workflowResource + "_" + jobName;
+ public static String getNamespacedJobName(String workflow, String jobName) {
+ return workflow + "_" + jobName;
}
/**
* Remove the workflow namespace from the job name
*
- * @param workflowResource the name of the workflow that owns the job
+ * @param workflow the name of the workflow that owns the job
* @param jobName the namespaced job name
* @return the denamespaced job name, or the same job name if it is already denamespaced
*/
- public static String getDenamespacedJobName(String workflowResource, String jobName) {
- if (jobName.contains(workflowResource)) {
+ public static String getDenamespacedJobName(String workflow, String jobName) {
+ if (jobName.contains(workflow)) {
// skip the entire length of the work plus the underscore
- return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
+ return jobName.substring(jobName.indexOf(workflow) + workflow.length() + 1);
} else {
return jobName;
}
@@ -416,6 +451,8 @@ public class TaskUtil {
* @param commandConfig map of job config key to config value
* @return serialized string
*/
+ // TODO: move this to the JobConfig
+ @Deprecated
public static String serializeJobCommandConfigMap(Map<String, String> commandConfig) {
ObjectMapper mapper = new ObjectMapper();
try {
@@ -433,6 +470,8 @@ public class TaskUtil {
* @param commandConfig the serialized job config map
* @return a map of job config key to config value
*/
+ // TODO: move this to the JobConfig
+ @Deprecated
public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig) {
ObjectMapper mapper = new ObjectMapper();
try {
@@ -446,18 +485,13 @@ public class TaskUtil {
return Collections.emptyMap();
}
- private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- return accessor.getProperty(keyBuilder.resourceConfig(resource));
- }
-
/**
* Extracts the partition id from the given partition name.
*
* @param pName
* @return
*/
- public static int getPartitionId(String pName) {
+ protected static int getPartitionId(String pName) {
int index = pName.lastIndexOf("_");
if (index == -1) {
throw new HelixException("Invalid partition name " + pName);
@@ -465,12 +499,320 @@ public class TaskUtil {
return Integer.valueOf(pName.substring(index + 1));
}
- public static String getWorkflowContextKey(String resource) {
+ @Deprecated
+ public static String getWorkflowContextKey(String workflow) {
// TODO: fix this to use the keyBuilder.
- return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+ return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow);
}
- public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String workflow) {
+ @Deprecated
+ public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, String workflow) {
return accessor.keyBuilder().resourceConfig(workflow);
}
+
+ /**
+ * Cleans up IdealState and external view associated with a job.
+ *
+ * @param accessor
+ * @param job
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, String job) {
+ return cleanupIdealStateExtView(accessor, job);
+ }
+
+ /**
+ * Cleans up IdealState and external view associated with a workflow.
+ *
+ * @param accessor
+ * @param workflow
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor,
+ String workflow) {
+ return cleanupIdealStateExtView(accessor, workflow);
+ }
+
+ /**
+ * Cleans up IdealState and external view associated with a job/workflow resource.
+ */
+ private static boolean cleanupIdealStateExtView(final HelixDataAccessor accessor,
+ String workflowJobResource) {
+ boolean success = true;
+ PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource);
+ if (accessor.getProperty(isKey) != null) {
+ if (!accessor.removeProperty(isKey)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove IdealState for %s. Failed to remove node %s.",
+ workflowJobResource, isKey));
+ success = false;
+ }
+ }
+
+ // Delete external view
+ PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource);
+ if (accessor.getProperty(evKey) != null) {
+ if (!accessor.removeProperty(evKey)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove ExternalView of resource %s. Failed to remove node %s.",
+ workflowJobResource, evKey));
+ success = false;
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ * Remove a workflow and all jobs for the workflow. This removes the workflow config, idealstate,
+ * externalview and workflow contexts associated with this workflow, and all jobs information,
+ * including their configs, context, IS and EV.
+ *
+ * @param manager
+ * @param workflow the workflow name.
+ * @param jobs all job names in this workflow.
+ *
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean removeWorkflow(final HelixManager manager, String workflow,
+ Set<String> jobs) {
+ boolean success = true;
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+ // clean up all jobs
+ for (String job : jobs) {
+ if (!removeJob(accessor, manager.getHelixPropertyStore(), job)) {
+ success = false;
+ }
+ }
+
+ if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
+ LOG.warn(String
+ .format("Error occurred while trying to remove workflow idealstate/externalview for %s.",
+ workflow));
+ success = false;
+ }
+ if (!removeWorkflowConfig(accessor, workflow)) {
+ LOG.warn(
+ String.format("Error occurred while trying to remove workflow config for %s.", workflow));
+ success = false;
+ }
+ if (!removeWorkflowContext(manager, workflow)) {
+ LOG.warn(String
+ .format("Error occurred while trying to remove workflow context for %s.", workflow));
+ success = false;
+ }
+
+ return success;
+ }
+
+ /**
+ * Remove a set of jobs from a workflow. This removes the config, context, IS and EV associated
+ * with each individual job, and removes all the jobs from the WorkflowConfig, and job states from
+ * WorkflowContext.
+ *
+ * @param dataAccessor
+ * @param propertyStore
+ * @param jobs
+ * @param workflow
+ * @param maintainDependency
+ *
+ * @return True if remove success, otherwise false
+ */
+ protected static boolean removeJobsFromWorkflow(final HelixDataAccessor dataAccessor,
+ final HelixPropertyStore propertyStore, final String workflow, final Set<String> jobs,
+ boolean maintainDependency) {
+ boolean success = true;
+ if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) {
+ LOG.warn("Error occurred while trying to remove jobs + " + jobs + " from the workflow "
+ + workflow);
+ success = false;
+ }
+ if (!removeJobsState(propertyStore, workflow, jobs)) {
+ LOG.warn(
+ "Error occurred while trying to remove jobs states from workflow + " + workflow + " jobs "
+ + jobs);
+ success = false;
+ }
+ for (String job : jobs) {
+ if (!removeJob(dataAccessor, propertyStore, job)) {
+ success = false;
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ * Return all jobs that are COMPLETED and passes its expiry time.
+ *
+ * @param dataAccessor
+ * @param propertyStore
+ * @param workflowConfig
+ * @param workflowContext
+ *
+ * @return
+ */
+ protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
+ HelixPropertyStore propertyStore, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ Set<String> expiredJobs = new HashSet<String>();
+
+ if (workflowContext != null) {
+ Map<String, TaskState> jobStates = workflowContext.getJobStates();
+ for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
+ JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
+ long expiry = jobConfig.getExpiry();
+ if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
+ expiry = workflowConfig.getExpiry();
+ }
+ if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+ if (System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+ expiredJobs.add(job);
+ }
+ }
+ }
+ }
+ return expiredJobs;
+ }
+
+ /* remove IS/EV, config and context of a job */
+ // Jobname is here should be NamespacedJobName.
+ private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+ String job) {
+ boolean success = true;
+ if (!cleanupJobIdealStateExtView(accessor, job)) {
+ LOG.warn(String
+ .format("Error occurred while trying to remove job idealstate/externalview for %s.",
+ job));
+ success = false;
+ }
+ if (!removeJobConfig(accessor, job)) {
+ LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
+ success = false;
+ }
+ if (!removeJobContext(propertyStore, job)) {
+ LOG.warn(String.format("Error occurred while trying to remove job context for %s.", job));
+ success = false;
+ }
+
+ return success;
+ }
+
+ /** Remove the job name from the DAG from the queue configuration */
+ // Job name should be namespaced job name here.
+ private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+ final Set<String> jobsToRemove, final boolean maintainDependency) {
+ // Now atomically clear the DAG
+ DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ JobDag jobDag = JobDag.fromJson(
+ currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
+ if (jobDag == null) {
+ LOG.warn("Could not update DAG for workflow: " + workflow + " JobDag is null.");
+ return null;
+ }
+ for (String job : jobsToRemove) {
+ jobDag.removeNode(job, maintainDependency);
+ }
+ try {
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return currentData;
+ }
+ };
+
+ String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath();
+ if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) {
+ LOG.warn("Failed to remove jobs " + jobsToRemove + " from DAG of workflow " + workflow);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * update workflow's property to remove jobs from JOB_STATES if there are already started.
+ */
+ private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+ final String workflow, final Set<String> jobs) {
+ String contextPath =
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
+
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+ @Override public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ WorkflowContext workflowContext = new WorkflowContext(currentData);
+ workflowContext.removeJobStates(jobs);
+ currentData = workflowContext.getRecord();
+ }
+ return currentData;
+ }
+ };
+ if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) {
+ LOG.warn("Fail to remove job state for jobs " + jobs + " from workflow " + workflow);
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean removeWorkflowJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+ String workflowJobResource) {
+ String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource);
+ if (propertyStore.exists(path, AccessOption.PERSISTENT)) {
+ if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove workflow/jobcontext for %s. Failed to remove node %s.",
+ workflowJobResource, path));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Remove workflow or job config.
+ *
+ * @param accessor
+ * @param workflowJobResource the workflow or job name
+ */
+ private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor,
+ String workflowJobResource) {
+ PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
+ if (accessor.getProperty(cfgKey) != null) {
+ if (!accessor.removeProperty(cfgKey)) {
+ LOG.warn(String.format(
+ "Error occurred while trying to remove config for %s. Failed to remove node %s.",
+ workflowJobResource, cfgKey));
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Set the resource config
+ * @param accessor Accessor to Helix configs
+ * @param resource The resource name
+ * @param resourceConfig The resource config to be set
+ * @return True if set successfully, otherwise false
+ */
+ private static boolean setResourceConfig(HelixDataAccessor accessor, String resource,
+ ResourceConfig resourceConfig) {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig);
+ }
+
+ private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ return accessor.getProperty(keyBuilder.resourceConfig(resource));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 4c6e971..80b5973 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -149,6 +149,14 @@ public class WorkflowConfig extends ResourceConfig {
.fromJson(getSimpleConfig(WorkflowConfigProperty.Dag.name())) : DEFAULT_JOB_DAG;
}
+ protected void setJobDag(JobDag jobDag) {
+ try {
+ putSimpleConfig(WorkflowConfigProperty.Dag.name(), jobDag.toJson());
+ } catch (IOException ex) {
+ throw new HelixException("Invalid job dag configuration!", ex);
+ }
+ }
+
public int getParallelJobs() {
return _record
.getIntField(WorkflowConfigProperty.ParallelJobs.name(), DEFAULT_PARALLEL_JOBS);
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index cc21ce3..563e2e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -20,6 +20,10 @@ package org.apache.helix.task;
*/
import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
@@ -36,7 +40,9 @@ public class WorkflowContext extends HelixProperty {
JOB_STATES,
LAST_SCHEDULED_WORKFLOW,
SCHEDULED_WORKFLOWS,
+ LAST_PURGE_TIME
}
+
public static final int UNSTARTED = -1;
public static final int UNFINISHED = -1;
@@ -45,41 +51,48 @@ public class WorkflowContext extends HelixProperty {
}
public void setWorkflowState(TaskState s) {
- if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) {
+ String workflowState = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+ if (workflowState == null) {
_record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
- } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name())
- .equals(TaskState.FAILED.name()) && !_record
- .getSimpleField(WorkflowContextProperties.STATE.name())
+ } else if (!workflowState.equals(TaskState.FAILED.name()) && !workflowState
.equals(TaskState.COMPLETED.name())) {
_record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name());
}
}
public TaskState getWorkflowState() {
- String s = _record.getSimpleField(WorkflowContextProperties.STATE.name());
- if (s == null) {
- return null;
+ String state = _record.getSimpleField(WorkflowContextProperties.STATE.name());
+ if (state == null) {
+ return TaskState.NOT_STARTED;
}
- return TaskState.valueOf(s);
+ return TaskState.valueOf(state);
}
- public void setJobState(String jobResource, TaskState s) {
+ public void setJobState(String job, TaskState s) {
Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
states = new TreeMap<>();
_record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
}
- states.put(jobResource, s.name());
+ states.put(job, s.name());
+ }
+
+ protected void removeJobStates(Set<String> jobs) {
+ Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+ if (states != null) {
+ states.keySet().removeAll(jobs);
+ _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+ }
}
- public TaskState getJobState(String jobResource) {
+ public TaskState getJobState(String job) {
Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
return null;
}
- String s = states.get(jobResource);
+ String s = states.get(job);
if (s == null) {
return null;
}
@@ -89,7 +102,8 @@ public class WorkflowContext extends HelixProperty {
public Map<String, TaskState> getJobStates() {
Map<String, TaskState> jobStates = new HashMap<>();
- Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+ Map<String, String> stateFieldMap =
+ _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (stateFieldMap != null) {
for (Map.Entry<String, String> state : stateFieldMap.entrySet()) {
jobStates.put(state.getKey(), TaskState.valueOf(state.getValue()));
@@ -131,17 +145,25 @@ public class WorkflowContext extends HelixProperty {
List<String> workflows = getScheduledWorkflows();
if (workflows == null) {
workflows = new ArrayList<>();
- _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
}
workflows.add(workflow);
+ _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows);
}
public String getLastScheduledSingleWorkflow() {
return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name());
}
+ protected void setLastJobPurgeTime(long epochTime) {
+ _record.setSimpleField(WorkflowContextProperties.LAST_PURGE_TIME.name(),
+ String.valueOf(epochTime));
+ }
+
+ public long getLastJobPurgeTime() {
+ return _record.getLongField(WorkflowContextProperties.LAST_PURGE_TIME.name(), -1);
+ }
+
public List<String> getScheduledWorkflows() {
return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name());
}
-
}
[3/4] helix git commit: Clean up jobs in a jobqueue automatically
after the job completes and passes its expiry time.
Posted by jx...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 8e72f7a..6e6727c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -19,15 +19,6 @@ package org.apache.helix.task;
* under the License.
*/
-import com.google.common.collect.Lists;
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.*;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.*;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.IdealStateBuilder;
-import org.apache.log4j.Logger;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -39,6 +30,23 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
+import com.google.common.collect.Lists;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.log4j.Logger;
+
+
/**
* Custom rebalancer implementation for the {@code Workflow} in task state model.
*/
@@ -52,7 +60,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
LOG.debug("Computer Best Partition for workflow: " + workflow);
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow);
+ WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_manager, workflow);
if (workflowCfg == null) {
LOG.warn("Workflow configuration is NULL for " + workflow);
return buildEmptyAssignment(workflow, currStateOutput);
@@ -70,7 +78,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
- cleanupWorkflow(workflow, workflowCfg);
+ cleanupWorkflow(workflow, workflowCfg);
return buildEmptyAssignment(workflow, currStateOutput);
}
@@ -124,7 +132,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
}
- cleanExpiredJobs(workflowCfg, workflowCtx);
+ // clean up the expired jobs if it is a queue.
+ if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
+ purgeExpiredJobs(workflow, workflowCfg, workflowCtx);
+ }
TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
return buildEmptyAssignment(workflow, currStateOutput);
@@ -158,7 +169,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
// check ancestor job status
if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_manager, job);
+
// Since the start time is calculated base on the time of completion of parent jobs for this
// job, the calculated start time should only be calculate once. Persist the calculated time
// in WorkflowContext znode.
@@ -440,140 +452,61 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
- * Cleans up workflow configs and workflow contexts associated with this workflow,
- * including all job-level configs and context, plus workflow-level information.
+ * Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
+ * contexts associated with this workflow, and all jobs information, including their configs,
+ * context, IS and EV.
*/
private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
LOG.info("Cleaning up workflow: " + workflow);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- /*
- if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
- LOG.error("Workflow " + workflow + " has not completed, abort the clean up task.");
- return;
- }*/
-
- for (String job : workflowcfg.getJobDag().getAllNodes()) {
- cleanupJob(job, workflow);
- }
-
- // clean up workflow-level info if this was the last in workflow
if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
- // clean up IS & EV
- cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
-
- // delete workflow config
- PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
- if (accessor.getProperty(workflowCfgKey) != null) {
- if (!accessor.removeProperty(workflowCfgKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.",
- workflow, workflowCfgKey));
- }
+ Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
+ // Remove all pending timer tasks for this workflow if exists
+ _scheduledRebalancer.removeScheduledRebalance(workflow);
+ for (String job : jobs) {
+ _scheduledRebalancer.removeScheduledRebalance(job);
}
- // Delete workflow context
- LOG.info("Removing workflow context: " + workflow);
- if (!TaskUtil.removeWorkflowContext(_manager, workflow)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up workflow %s. Aborting further clean up steps.",
- workflow));
+ if (!TaskUtil.removeWorkflow(_manager, workflow, jobs)) {
+ LOG.warn("Failed to clean up workflow " + workflow);
}
-
- // Remove pending timer task for this workflow if exists
- _scheduledRebalancer.removeScheduledRebalance(workflow);
+ } else {
+ LOG.info("Did not clean up workflow " + workflow
+ + " because neither the workflow is non-terminable nor is set to DELETE.");
}
}
-
/**
- * Cleans up job configs and job contexts associated with this job,
- * including all job-level configs and context, plus the job info in the workflow context.
+ * Clean up all jobs that are COMPLETED and passes its expiry time.
+ *
+ * @param workflowConfig
+ * @param workflowContext
*/
- private void cleanupJob(final String job, String workflow) {
- LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
- // Remove any idealstate and externalView.
- cleanupIdealStateExtView(accessor, job);
-
- // Remove DAG references in workflow
- PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
- DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override public ZNRecord update(ZNRecord currentData) {
- if (currentData != null) {
- JobDag jobDag = JobDag.fromJson(
- currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
- for (String child : jobDag.getDirectChildren(job)) {
- jobDag.getChildrenToParents().get(child).remove(job);
- }
- for (String parent : jobDag.getDirectParents(job)) {
- jobDag.getParentsToChildren().get(parent).remove(job);
- }
- jobDag.getChildrenToParents().remove(job);
- jobDag.getParentsToChildren().remove(job);
- jobDag.getAllNodes().remove(job);
- try {
- currentData
- .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
- } catch (Exception e) {
- LOG.error("Could not update DAG for job: " + job, e);
- }
- } else {
- LOG.error("Could not update DAG for job: " + job + " ZNRecord is null.");
- }
- return currentData;
- }
- };
- accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
- AccessOption.PERSISTENT);
-
- // Delete job configs.
- PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job);
- if (accessor.getProperty(cfgKey) != null) {
- if (!accessor.removeProperty(cfgKey)) {
- LOG.error(String.format(
- "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
- job, cfgKey));
- }
- }
-
- // Delete job context
- // For recurring workflow, it's OK if the node doesn't exist.
- if (!TaskUtil.removeJobContext(_manager, job)) {
- LOG.warn(String.format("Error occurred while trying to clean up job %s.", job));
- }
-
- LOG.info(String.format("Successfully cleaned up job context %s.", job));
-
- _scheduledRebalancer.removeScheduledRebalance(job);
- }
-
- private void cleanExpiredJobs(WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
- if (workflowContext == null) {
+ // TODO: run this in a separate thread.
+ // Get all jobConfigs & jobContext from ClusterCache.
+ protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
return;
}
- Map<String, TaskState> jobStates = workflowContext.getJobStates();
- long newTimeToClean = Long.MAX_VALUE;
- for (String job : workflowConfig.getJobDag().getAllNodes()) {
- JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
- JobContext jobContext = TaskUtil.getJobContext(_manager, job);
- // There is no ABORTED state for JobQueue Job. The job will die with workflow
- if (jobContext != null && jobStates.containsKey(job) && (
- jobStates.get(job) == TaskState.COMPLETED || jobStates.get(job) == TaskState.FAILED)) {
- if (System.currentTimeMillis() >= jobConfig.getExpiry() + jobContext.getFinishTime()) {
- cleanupJob(job, workflowConfig.getWorkflowId());
- } else {
- newTimeToClean =
- Math.min(newTimeToClean, jobConfig.getExpiry() + jobContext.getFinishTime());
- }
- }
+ Set<String> expiredJobs = TaskUtil
+ .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+ workflowConfig, workflowContext);
+ for (String job : expiredJobs) {
+ _scheduledRebalancer.removeScheduledRebalance(job);
+ }
+ if (!TaskUtil
+ .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+ workflow, expiredJobs, true)) {
+ LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
}
- if (newTimeToClean < Long.MAX_VALUE && newTimeToClean < _scheduledRebalancer
- .getRebalanceTime(workflowConfig.getWorkflowId())) {
- _scheduledRebalancer
- .scheduleRebalance(_manager, workflowConfig.getWorkflowId(), newTimeToClean);
+ long currentTime = System.currentTimeMillis();
+ long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
+ workflowContext.setLastJobPurgeTime(currentTime);
+ long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
+ if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+ _scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
index 7688017..ef8f971 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java
@@ -116,7 +116,7 @@ public class TaskAdmin {
driver.flushQueue(workflow);
break;
case clean:
- driver.cleanupJobQueue(workflow);
+ driver.cleanupQueue(workflow);
break;
default:
throw new IllegalArgumentException("Unknown command " + args[0]);
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 641f13a..6036732 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -212,9 +212,10 @@ public class TaskTestUtil {
}
public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
- int failureThreshold) {
+ int failureThreshold, int capacity) {
WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
workflowCfgBuilder.setExpiry(120000);
+ workflowCfgBuilder.setCapacity(capacity);
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -228,8 +229,17 @@ public class TaskTestUtil {
return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
}
+ public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
+ int failureThreshold) {
+ return buildJobQueue(jobQueueName, delayStart, failureThreshold, 500);
+ }
+
public static JobQueue.Builder buildJobQueue(String jobQueueName) {
- return buildJobQueue(jobQueueName, 0, 0);
+ return buildJobQueue(jobQueueName, 0, 0, 500);
+ }
+
+ public static JobQueue.Builder buildJobQueue(String jobQueueName, int capacity) {
+ return buildJobQueue(jobQueueName, 0, 0, capacity);
}
public static WorkflowContext buildWorkflowContext(String workflowResource,
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index 71fed49..a0a1617 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -52,7 +52,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
_driver.start(builder.build());
_driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
TaskState.FAILED);
- _driver.cleanupJobQueue(queueName);
+ _driver.cleanupQueue(queueName);
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
}
@@ -71,7 +71,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
_driver.start(builder.build());
_driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 3),
TaskState.IN_PROGRESS);
- _driver.cleanupJobQueue(queueName);
+ _driver.cleanupQueue(queueName);
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/e530bf51/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index 16df022..4d0c3c6 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -19,6 +19,9 @@ package org.apache.helix.task;
* under the License.
*/
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.integration.task.MockTask;
@@ -39,34 +42,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
@Test
public void testCleanExpiredJobs() throws Exception {
- String workflowName = TestHelper.getTestMethodName();
- JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+ String queue = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
long startTime = System.currentTimeMillis();
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 8; i++) {
builder.enqueueJob("JOB" + i, jobBuilder);
- TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB" + i),
+ }
+
+ for (int i = 0; i < 8; i++) {
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
}
+ for (int i = 4; i < 6; i++) {
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
+ TaskTestUtil
+ .buildJobContext(startTime, startTime + 100000, TaskPartitionState.COMPLETED));
+ }
+
WorkflowContext workflowContext = TaskTestUtil
- .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
- TaskState.FAILED, TaskState.ABORTED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+ .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+ TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, TaskState.COMPLETED,
+ TaskState.COMPLETED, TaskState.IN_PROGRESS, TaskState.NOT_STARTED);
+
+ Set<String> jobsLeft = new HashSet<String>();
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 1));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 2));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 4));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 5));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 6));
+ jobsLeft.add(TaskUtil.getNamespacedJobName(queue, "JOB" + 7));
+
_driver.start(builder.build());
_cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
- TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+ TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
TaskTestUtil.calculateBestPossibleState(_cache, _manager);
- WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
- Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 3);
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+ Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
+ workflowContext = _driver.getWorkflowContext(queue);
+ Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
+ && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis());
}
- @Test void testNotCleanJobsDueToParentFail() throws Exception {
- String workflowName = TestHelper.getTestMethodName();
- JobQueue.Builder builder = TaskTestUtil.buildJobQueue(workflowName);
+ @Test
+ void testNotCleanJobsDueToParentFail() throws Exception {
+ String queue = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
@@ -76,17 +102,57 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
builder.enqueueJob("JOB0", jobBuilder);
builder.enqueueJob("JOB1", jobBuilder);
builder.addParentChildDependency("JOB0", "JOB1");
- TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(workflowName, "JOB0"),
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB0"),
TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
WorkflowContext workflowContext = TaskTestUtil
- .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.FAILED,
+ .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.FAILED,
TaskState.FAILED);
_driver.start(builder.build());
_cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
- TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+ TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
TaskTestUtil.calculateBestPossibleState(_cache, _manager);
- WorkflowConfig workflowConfig = _driver.getWorkflowConfig(workflowName);
- Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 1);
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+ Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), 2);
+ }
+
+ @Test
+ void testNotCleanJobsThroughEnqueueJob() throws Exception {
+ int capacity = 5;
+ String queue = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue, capacity);
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+ .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < capacity; i++) {
+ builder.enqueueJob("JOB" + i, jobBuilder);
+ }
+
+ _driver.start(builder.build());
+ try {
+ // should fail here since the queue is full.
+ _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+ Assert.fail("Queue is not full.");
+ } catch (HelixException e) {
+ Assert.assertTrue(e.getMessage().contains("queue is full"));
+ }
+
+ for (int i = 0; i < capacity; i++) {
+ TaskUtil.setJobContext(_manager, TaskUtil.getNamespacedJobName(queue, "JOB" + i),
+ TaskTestUtil.buildJobContext(startTime, startTime, TaskPartitionState.COMPLETED));
+ }
+
+ WorkflowContext workflowContext = TaskTestUtil
+ .buildWorkflowContext(queue, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+ TaskState.COMPLETED, TaskState.FAILED, TaskState.IN_PROGRESS);
+ TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
+
+ _driver.enqueueJob(queue, "JOB" + capacity, jobBuilder);
+
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
+ Assert.assertEquals(workflowConfig.getJobDag().getAllNodes().size(), capacity - 1);
}
}
[2/4] helix git commit: Support configurable job purge interval for a
queue.
Posted by jx...@apache.org.
Support configurable job purge interval for a queue.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0aeb5579
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0aeb5579
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0aeb5579
Branch: refs/heads/master
Commit: 0aeb5579d4f7130d6a4310d2d817e0153620cae0
Parents: e530bf5
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 7 14:59:10 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Oct 6 12:23:47 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 8 +-
.../org/apache/helix/task/TaskRebalancer.java | 1 -
.../java/org/apache/helix/task/TaskUtil.java | 13 +--
.../java/org/apache/helix/task/Workflow.java | 11 ++
.../org/apache/helix/task/WorkflowConfig.java | 68 ++++++++++-
.../org/apache/helix/task/WorkflowContext.java | 75 ++++++++++--
.../apache/helix/task/WorkflowRebalancer.java | 114 +++++++++++--------
.../integration/TestDelayedAutoRebalance.java | 29 +++--
.../integration/task/TestJobQueueCleanUp.java | 49 ++++++++
9 files changed, 281 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 15c906a..f21c005 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,10 +47,6 @@ import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
/**
* CLI for scheduling/canceling workflows
*/
@@ -117,8 +112,7 @@ public class TaskDriver {
flow.validate();
WorkflowConfig newWorkflowConfig =
- new WorkflowConfig.Builder().setConfigMap(flow.getResourceConfigMap())
- .setWorkflowId(flow.getName()).build();
+ new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
Map<String, String> jobTypes = new HashMap<String, String>();
// add all job configs.
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 20a9233..5dbb2a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -43,7 +43,6 @@ import com.google.common.collect.Maps;
* Abstract rebalancer class for the {@code Task} state model.
*/
public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
- public static final String START_TIME_KEY = "StartTime";
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
protected static long JOB_PURGE_INTERVAL = 10 * 60 * 1000;
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index f064bbf..42d252e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -36,17 +36,14 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
/**
* Static utility methods.
@@ -679,7 +676,7 @@ public class TaskUtil {
/* remove IS/EV, config and context of a job */
// Jobname is here should be NamespacedJobName.
- private static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
+ protected static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
String job) {
boolean success = true;
if (!cleanupJobIdealStateExtView(accessor, job)) {
@@ -688,7 +685,7 @@ public class TaskUtil {
job));
success = false;
}
- if (!removeJobConfig(accessor, job)) {
+ if (!removeWorkflowJobConfig(accessor, job)) {
LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job));
success = false;
}
@@ -702,7 +699,8 @@ public class TaskUtil {
/** Remove the job name from the DAG from the queue configuration */
// Job name should be namespaced job name here.
- private static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
+
+ protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, final String workflow,
final Set<String> jobsToRemove, final boolean maintainDependency) {
// Now atomically clear the DAG
DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@@ -741,7 +739,7 @@ public class TaskUtil {
/**
* update workflow's property to remove jobs from JOB_STATES if there are already started.
*/
- private static boolean removeJobsState(final HelixPropertyStore propertyStore,
+ protected static boolean removeJobsState(final HelixPropertyStore propertyStore,
final String workflow, final Set<String> jobs) {
String contextPath =
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
@@ -751,6 +749,7 @@ public class TaskUtil {
if (currentData != null) {
WorkflowContext workflowContext = new WorkflowContext(currentData);
workflowContext.removeJobStates(jobs);
+ workflowContext.removeJobStartTime(jobs);
currentData = workflowContext.getRecord();
}
return currentData;
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index a7060c3..74c325e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -297,11 +297,22 @@ public class Workflow {
return this;
}
+ /**
+ * Set the config via an existing workflowConfig.
+ * BE CAUTION!: All the previous settings will be override by setting here.
+ *
+ * @param workflowConfig
+ * @return
+ */
public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
_workflowConfigBuilder = new WorkflowConfig.Builder(workflowConfig);
return this;
}
+ public WorkflowConfig getWorkflowConfig() {
+ return _workflowConfigBuilder.build();
+ }
+
public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
if (_workflowConfigBuilder == null) {
_workflowConfigBuilder = new WorkflowConfig.Builder();
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 80b5973..cc0fdce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -63,7 +63,8 @@ public class WorkflowConfig extends ResourceConfig {
JobTypes,
IsJobQueue,
/* Allow multiple jobs in this workflow to be assigned to a same instance or not */
- AllowOverlapJobAssignment
+ AllowOverlapJobAssignment,
+ JobPurgeInterval
}
/* Default values */
@@ -77,6 +78,7 @@ public class WorkflowConfig extends ResourceConfig {
public static final boolean DEFAULT_JOB_QUEUE = false;
public static final boolean DEFAULT_MONITOR_DISABLE = true;
public static final boolean DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT = false;
+ protected static final long DEFAULT_JOB_PURGE_INTERVAL = 30 * 60 * 1000; //default 30 minutes
public WorkflowConfig(HelixProperty property) {
super(property.getRecord());
@@ -85,7 +87,7 @@ public class WorkflowConfig extends ResourceConfig {
public WorkflowConfig(WorkflowConfig cfg, String workflowId) {
this(workflowId, cfg.getJobDag(), cfg.getParallelJobs(), cfg.getTargetState(), cfg.getExpiry(),
cfg.getFailureThreshold(), cfg.isTerminable(), cfg.getScheduleConfig(), cfg.getCapacity(),
- cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment());
+ cfg.getWorkflowType(), cfg.isJobQueue(), cfg.getJobTypes(), cfg.isAllowOverlapJobAssignment(),cfg.getJobPurgeInterval());
}
/* Member variables */
@@ -94,7 +96,7 @@ public class WorkflowConfig extends ResourceConfig {
protected WorkflowConfig(String workflowId, JobDag jobDag, int parallelJobs,
TargetState targetState, long expiry, int failureThreshold, boolean terminable,
ScheduleConfig scheduleConfig, int capacity, String workflowType, boolean isJobQueue,
- Map<String, String> jobTypes, boolean allowOverlapJobAssignment) {
+ Map<String, String> jobTypes, boolean allowOverlapJobAssignment, long purgeInterval) {
super(workflowId);
putSimpleConfig(WorkflowConfigProperty.WorkflowID.name(), workflowId);
@@ -110,6 +112,7 @@ public class WorkflowConfig extends ResourceConfig {
putSimpleConfig(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue));
putSimpleConfig(WorkflowConfigProperty.FailureThreshold.name(), String.valueOf(failureThreshold));
putSimpleConfig(WorkflowConfigProperty.AllowOverlapJobAssignment.name(), String.valueOf(allowOverlapJobAssignment));
+ putSimpleConfig(WorkflowConfigProperty.JobPurgeInterval.name(), String.valueOf(purgeInterval));
if (capacity > 0) {
putSimpleConfig(WorkflowConfigProperty.capacity.name(), String.valueOf(capacity));
@@ -171,6 +174,11 @@ public class WorkflowConfig extends ResourceConfig {
return _record.getLongField(WorkflowConfigProperty.Expiry.name(), DEFAULT_EXPIRY);
}
+ public long getJobPurgeInterval() {
+ return _record
+ .getLongField(WorkflowConfigProperty.JobPurgeInterval.name(), DEFAULT_JOB_PURGE_INTERVAL);
+ }
+
/**
* This Failure threshold only works for generic workflow. Will be ignored by JobQueue
* @return
@@ -312,13 +320,14 @@ public class WorkflowConfig extends ResourceConfig {
private boolean _isJobQueue = DEFAULT_JOB_QUEUE;
private Map<String, String> _jobTypes;
private boolean _allowOverlapJobAssignment = DEFAULT_ALLOW_OVERLAP_JOB_ASSIGNMENT;
+ private long _jobPurgeInterval = DEFAULT_JOB_PURGE_INTERVAL;
public WorkflowConfig build() {
validate();
return new WorkflowConfig(_workflowId, _taskDag, _parallelJobs, _targetState, _expiry,
_failureThreshold, _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue,
- _jobTypes, _allowOverlapJobAssignment);
+ _jobTypes, _allowOverlapJobAssignment, _jobPurgeInterval);
}
public Builder() {}
@@ -337,6 +346,7 @@ public class WorkflowConfig extends ResourceConfig {
_isJobQueue = workflowConfig.isJobQueue();
_jobTypes = workflowConfig.getJobTypes();
_allowOverlapJobAssignment = workflowConfig.isAllowOverlapJobAssignment();
+ _jobPurgeInterval = workflowConfig.getJobPurgeInterval();
}
public Builder setWorkflowId(String v) {
@@ -359,16 +369,53 @@ public class WorkflowConfig extends ResourceConfig {
return this;
}
+ /**
+ * The expiry time for this workflow. Helix may clean up the workflow information after the
+ * expiry time from the completion of the workflow.
+ *
+ * @param v
+ * @param unit
+ *
+ * @return
+ */
public Builder setExpiry(long v, TimeUnit unit) {
_expiry = unit.toMillis(v);
return this;
}
+ /**
+ * The expiry time for this workflow. Helix may clean up the workflow information after the
+ * expiry time from the completion of the workflow.
+ *
+ * @param v in milliseconds
+ *
+ * @return
+ */
public Builder setExpiry(long v) {
_expiry = v;
return this;
}
+ /**
+ * The time periodical Helix should clean up all completed jobs. This config applies only on
+ * JobQueue.
+ *
+ * @param t in milliseconds
+ *
+ * @return
+ */
+ public Builder setJobPurgeInterval(long t) {
+ _jobPurgeInterval = t;
+ return this;
+ }
+
+ /**
+ * The max allowed numbers of failed jobs before Helix should marks the workflow failure.
+ *
+ * @param failureThreshold
+ *
+ * @return
+ */
public Builder setFailureThreshold(int failureThreshold) {
_failureThreshold = failureThreshold;
return this;
@@ -376,7 +423,7 @@ public class WorkflowConfig extends ResourceConfig {
/**
* This method only applies for JobQueue, will be ignored in generic workflows
- * @param capacity The number of capacity
+ * @param capacity The max number of jobs allowed in the queue
* @return This builder
*/
public Builder setCapacity(int capacity) {
@@ -389,7 +436,7 @@ public class WorkflowConfig extends ResourceConfig {
return this;
}
- public Builder setTerminable(boolean isTerminable) {
+ protected Builder setTerminable(boolean isTerminable) {
_isTerminable = isTerminable;
return this;
}
@@ -425,6 +472,7 @@ public class WorkflowConfig extends ResourceConfig {
builder.setConfigMap(cfg);
return builder;
}
+
// TODO: Add API to set map fields. This API only set simple fields
public Builder setConfigMap(Map<String, String> cfg) {
if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
@@ -459,6 +507,14 @@ public class WorkflowConfig extends ResourceConfig {
}
}
+ if (cfg.containsKey(WorkflowConfigProperty.JobPurgeInterval.name())) {
+ long jobPurgeInterval =
+ Long.valueOf(cfg.get(WorkflowConfigProperty.JobPurgeInterval.name()));
+ if (jobPurgeInterval > 0) {
+ setJobPurgeInterval(jobPurgeInterval);
+ }
+ }
+
if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
int threshold = Integer.valueOf(cfg.get(WorkflowConfigProperty.FailureThreshold.name()));
if (threshold >= 0) {
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 563e2e8..2fc4fe1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -27,12 +27,15 @@ import java.util.TreeMap;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
/**
* Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
* property store
*/
public class WorkflowContext extends HelixProperty {
+ private static final Logger LOG = Logger.getLogger(WorkflowContext.class);
+
protected enum WorkflowContextProperties {
STATE,
START_TIME,
@@ -40,7 +43,8 @@ public class WorkflowContext extends HelixProperty {
JOB_STATES,
LAST_SCHEDULED_WORKFLOW,
SCHEDULED_WORKFLOWS,
- LAST_PURGE_TIME
+ LAST_PURGE_TIME,
+ StartTime
}
public static final int UNSTARTED = -1;
@@ -78,14 +82,6 @@ public class WorkflowContext extends HelixProperty {
states.put(job, s.name());
}
- protected void removeJobStates(Set<String> jobs) {
- Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
- if (states != null) {
- states.keySet().removeAll(jobs);
- _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
- }
- }
-
public TaskState getJobState(String job) {
Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
if (states == null) {
@@ -100,6 +96,67 @@ public class WorkflowContext extends HelixProperty {
return TaskState.valueOf(s);
}
+ protected void removeJobStates(Set<String> jobs) {
+ Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name());
+ if (states != null) {
+ states.keySet().removeAll(jobs);
+ _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states);
+ }
+ }
+
+ protected void setJobStartTime(String job, long time) {
+ Map<String, String> startTimes =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimes == null) {
+ startTimes = new HashMap<>();
+ _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+ }
+ startTimes.put(job, String.valueOf(time));
+ }
+
+ protected void removeJobStartTime(Set<String> jobs) {
+ Map<String, String> startTimes =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimes != null) {
+ startTimes.keySet().removeAll(jobs);
+ _record.setMapField(WorkflowContextProperties.StartTime.name(), startTimes);
+ }
+ }
+
+ public long getJobStartTime(String job) {
+ Map<String, String> startTimes =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimes == null || !startTimes.containsKey(job)) {
+ return -1;
+ }
+
+ String t = startTimes.get(job);
+ if (t == null) {
+ return -1;
+ }
+
+ try {
+ long ret = Long.valueOf(t);
+ return ret;
+ } catch (NumberFormatException e) {
+ LOG.warn("Number error " + t + " for job start time of " + job);
+ return -1;
+ }
+ }
+
+ public Map<String, Long> getJobStartTimes() {
+ Map<String, Long> startTimes = new HashMap<>();
+ Map<String, String> startTimesMap =
+ _record.getMapField(WorkflowContextProperties.StartTime.name());
+ if (startTimesMap != null) {
+ for (Map.Entry<String, String> time : startTimesMap.entrySet()) {
+ startTimes.put(time.getKey(), Long.valueOf(time.getValue()));
+ }
+ }
+
+ return startTimes;
+ }
+
public Map<String, TaskState> getJobStates() {
Map<String, TaskState> jobStates = new HashMap<>();
Map<String, String> stateFieldMap =
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6e6727c..ac2ac87 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -53,9 +53,9 @@ import org.apache.log4j.Logger;
public class WorkflowRebalancer extends TaskRebalancer {
private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class);
- @Override
- public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
- IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+ @Override public ResourceAssignment computeBestPossiblePartitionState(
+ ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+ CurrentStateOutput currStateOutput) {
final String workflow = resource.getResourceName();
LOG.debug("Computer Best Partition for workflow: " + workflow);
@@ -93,8 +93,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
long currentTime = System.currentTimeMillis();
// Check if workflow has been finished and mark it if it is.
- if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
- && isWorkflowFinished(workflowCtx, workflowCfg)) {
+ if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
+ workflowCfg)) {
workflowCtx.setFinishTime(currentTime);
TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
}
@@ -123,8 +123,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
// Check for readiness, and stop processing if it's not ready
- boolean isReady =
- scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
+ boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
if (isReady) {
// Schedule jobs from this workflow.
scheduleJobs(workflow, workflowCfg, workflowCtx);
@@ -142,10 +141,11 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
- * Figure out whether the jobs in the workflow should be run,
- * and if it's ready, then just schedule it
+ * Figure out whether the jobs in the workflow should be run, and if it's ready, then just
+ * schedule it
*/
- private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+ private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
+ WorkflowContext workflowCtx) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
LOG.debug("Jobs from recurring workflow are not schedule-able");
@@ -163,7 +163,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
LOG.debug(String.format("Workflow %s already have enough job in progress, "
- + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
+ + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
break;
}
@@ -174,25 +174,16 @@ public class WorkflowRebalancer extends TaskRebalancer {
// Since the start time is calculated base on the time of completion of parent jobs for this
// job, the calculated start time should only be calculate once. Persist the calculated time
// in WorkflowContext znode.
- Map<String, String> startTimeMap = workflowCtx.getRecord().getMapField(START_TIME_KEY);
- if (startTimeMap == null) {
- startTimeMap = new HashMap<String, String>();
- workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
- }
-
- long calculatedStartTime = System.currentTimeMillis();
- if (startTimeMap.containsKey(job)) {
- // Get the start time if it is already calculated
- calculatedStartTime = Long.parseLong(startTimeMap.get(job));
- } else {
+ long calculatedStartTime = workflowCtx.getJobStartTime(job);
+ if (calculatedStartTime < 0) {
+ // Calculate the start time if it is not already calculated
+ calculatedStartTime = System.currentTimeMillis();
// If the start time is not calculated before, do the math.
if (jobConfig.getExecutionDelay() >= 0) {
calculatedStartTime += jobConfig.getExecutionDelay();
}
calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
- startTimeMap.put(job, String.valueOf(calculatedStartTime));
- workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
- TaskUtil.setWorkflowContext(_manager, jobConfig.getWorkflow(), workflowCtx);
+ workflowCtx.setJobStartTime(job, calculatedStartTime);
}
// Time is not ready. Set a trigger and update the start time.
@@ -285,9 +276,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
/**
* Check if a workflow is ready to schedule, and schedule a rebalance if it is not
*
- * @param workflow the Helix resource associated with the workflow
- * @param workflowCfg the workflow to check
- * @param workflowCtx the current workflow context
+ * @param workflow the Helix resource associated with the workflow
+ * @param workflowCfg the workflow to check
+ * @param workflowCtx the current workflow context
+ *
* @return true if the workflow is ready for schedule, false if not ready
*/
private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
@@ -330,7 +322,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
long offsetMultiplier = (-delayFromStart) / period;
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
// Now clone the workflow if this clone has not yet been created
DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -350,7 +341,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
// Persist workflow start regardless of success to avoid retrying and failing
workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
- TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
}
// Change the time to trigger the pipeline to that of the next run
@@ -379,6 +369,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
* @param origWorkflowName the name of the existing workflow
* @param newWorkflowName the name of the new workflow
* @param newStartTime a provided start time that deviates from the desired start time
+ *
* @return the cloned workflow, or null if there was a problem cloning the existing one
*/
public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
@@ -452,9 +443,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
/**
+<<<<<<< HEAD
* Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
* contexts associated with this workflow, and all jobs information, including their configs,
* context, IS and EV.
+=======
+ * Cleans up workflow configs and workflow contexts associated with this workflow, including all
+ * job-level configs and context, plus workflow-level information.
+>>>>>>> Support configurable job purge interval for a queue.
*/
private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
LOG.info("Cleaning up workflow: " + workflow);
@@ -483,36 +479,54 @@ public class WorkflowRebalancer extends TaskRebalancer {
*/
// TODO: run this in a separate thread.
// Get all jobConfigs & jobContext from ClusterCache.
- protected void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+ private void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
WorkflowContext workflowContext) {
- if (workflowContext.getLastJobPurgeTime() + JOB_PURGE_INTERVAL > System.currentTimeMillis()) {
- return;
- }
+ long purgeInterval = workflowConfig.getJobPurgeInterval();
+ long currentTime = System.currentTimeMillis();
- Set<String> expiredJobs = TaskUtil
- .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
- workflowConfig, workflowContext);
- for (String job : expiredJobs) {
- _scheduledRebalancer.removeScheduledRebalance(job);
- }
- if (!TaskUtil
- .removeJobsFromWorkflow(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
- workflow, expiredJobs, true)) {
- LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+ if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+ Set<String> expiredJobs = TaskUtil
+ .getExpiredJobs(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
+ workflowConfig, workflowContext);
+
+ if (expiredJobs.isEmpty()) {
+ LOG.info("No job to purge for the queue " + workflow);
+ } else {
+ LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+ for (String job : expiredJobs) {
+ if (!TaskUtil
+ .removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(), job)) {
+ LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+ }
+ _scheduledRebalancer.removeScheduledRebalance(job);
+ }
+ if (!TaskUtil
+ .removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+ LOG.warn(
+ "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow "
+ + workflow);
+ }
+ // remove job states in workflowContext.
+ workflowContext.removeJobStates(expiredJobs);
+ workflowContext.removeJobStartTime(expiredJobs);
+ }
+ workflowContext.setLastJobPurgeTime(currentTime);
}
- long currentTime = System.currentTimeMillis();
- long nextPurgeTime = currentTime + JOB_PURGE_INTERVAL;
- workflowContext.setLastJobPurgeTime(currentTime);
+ setNextJobPurgeTime(workflow, currentTime, purgeInterval);
+ }
+
+ private void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval) {
+ long nextPurgeTime = currentTime + purgeInterval;
long currentScheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
_scheduledRebalancer.scheduleRebalance(_manager, workflow, nextPurgeTime);
}
}
- @Override
- public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ @Override public IdealState computeNewIdealState(String resourceName,
+ IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+ ClusterDataCache clusterData) {
// Nothing to do here with workflow resource.
return currentIdealState;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
index d875a60..f431285 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -177,7 +177,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
_participants.get(0).getInstanceName());
}
@@ -188,7 +190,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
}
}
@@ -222,7 +226,8 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
}
Thread.sleep(delay + 10000);
@@ -230,7 +235,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, _replica);
}
}
@@ -267,8 +274,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
- validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, minActiveReplica);
+ validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
_participants.get(0).getInstanceName());
}
@@ -317,7 +326,9 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String db : _testDBs) {
ExternalView ev =
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(is, ev, _replica);
}
disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
@@ -404,6 +415,10 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
for (String partition : is.getPartitionSet()) {
Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+ Assert.assertNotNull(assignmentMap,
+ is.getResourceName() + "'s best possible assignment is null for partition " + partition);
+ Assert.assertTrue(!assignmentMap.isEmpty(),
+ is.getResourceName() + "'s partition " + partition + " has no best possible map in IS.");
boolean hasTopState = false;
int activeReplica = 0;
http://git-wip-us.apache.org/repos/asf/helix/blob/0aeb5579/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index a0a1617..6eecf20 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -19,11 +19,16 @@ package org.apache.helix.integration.task;
* under the License.
*/
+import java.util.HashSet;
+import java.util.Set;
import org.apache.helix.TestHelper;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -74,4 +79,48 @@ public class TestJobQueueCleanUp extends TaskTestBase {
_driver.cleanupQueue(queueName);
Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
}
+
+ @Test
+ public void testJobQueueAutoCleanUp() throws InterruptedException {
+ int capacity = 10;
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
+ WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder(builder.getWorkflowConfig());
+ cfgBuilder.setJobPurgeInterval(1000);
+ builder.setWorkflowConfig(cfgBuilder.build());
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2).setJobCommandConfigMap(
+ ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, String.valueOf(capacity / 2)))
+ .setExpiry(200L);
+ Set<String> deletedJobs = new HashSet<String>();
+ Set<String> remainJobs = new HashSet<String>();
+ for (int i = 0; i < capacity; i++) {
+ builder.enqueueJob("JOB" + i, jobBuilder);
+ if (i < capacity/2) {
+ deletedJobs.add("JOB" + i);
+ } else {
+ remainJobs.add(TaskUtil.getNamespacedJobName(queueName, "JOB" + i));
+ }
+ }
+ _driver.start(builder.build());
+ _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
+ Thread.sleep(2000);
+
+ WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+ Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+
+ WorkflowContext context = _driver.getWorkflowContext(queueName);
+ Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
+ Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+
+ for (String job : deletedJobs) {
+ JobConfig cfg = _driver.getJobConfig(job);
+ JobContext ctx = _driver.getJobContext(job);
+ Assert.assertNull(cfg);
+ Assert.assertNull(ctx);
+ }
+
+ }
}