You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:07 UTC
[11/33] helix git commit: Refactor Workflow and Jobqueue builders to
make the builder API more clean.
Refactor Workflow and Jobqueue builders to make the builder API more clean.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d386aff3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d386aff3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d386aff3
Branch: refs/heads/helix-0.6.x
Commit: d386aff394f2e4e7202f13fe2ed5e6533a8cfb29
Parents: 66dba1f
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Feb 23 17:06:35 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:48:35 2016 -0700
----------------------------------------------------------------------
.../webapp/resources/JobQueuesResource.java | 8 +-
.../webapp/resources/WorkflowsResource.java | 4 +-
.../helix/task/DeprecatedTaskRebalancer.java | 21 +-
.../java/org/apache/helix/task/JobConfig.java | 144 ++++++-------
.../main/java/org/apache/helix/task/JobDag.java | 5 +-
.../java/org/apache/helix/task/JobQueue.java | 98 +++++----
.../java/org/apache/helix/task/TaskDriver.java | 36 ++--
.../java/org/apache/helix/task/TaskUtil.java | 35 ----
.../java/org/apache/helix/task/Workflow.java | 156 ++++++++------
.../org/apache/helix/task/WorkflowConfig.java | 207 +++++++++++++++----
.../apache/helix/task/WorkflowRebalancer.java | 43 ++--
.../helix/integration/task/TaskTestUtil.java | 48 +++--
.../integration/task/TestRecurringJobQueue.java | 54 ++++-
.../task/TestTaskRebalancerParallel.java | 11 +-
.../integration/task/TestUpdateWorkflow.java | 99 +++++++--
.../integration/task/WorkflowGenerator.java | 9 -
16 files changed, 605 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
index 1a5cb17..e0a0657 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
@@ -103,9 +103,11 @@ public class JobQueuesResource extends ServerResource {
Map.Entry<String, HelixProperty> e = it.next();
HelixProperty resource = e.getValue();
Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
- boolean isTerminable = resource.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
- if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
- || !simpleFields.containsKey(WorkflowConfig.DAG) || isTerminable) {
+ boolean isTerminable = resource.getRecord()
+ .getBooleanField(WorkflowConfig.WorkflowConfigProperty.Terminable.name(), true);
+ if (!simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.TargetState.name())
+ || !simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())
+ || isTerminable) {
it.remove();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
index 9175530..c517fea 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -96,8 +96,8 @@ public class WorkflowsResource extends ServerResource {
Map.Entry<String, HelixProperty> e = it.next();
HelixProperty resource = e.getValue();
Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
- if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
- || !simpleFields.containsKey(WorkflowConfig.DAG)) {
+ if (!simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.TargetState.name())
+ || !simpleFields.containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
it.remove();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 6f744f0..855312b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -673,19 +673,21 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
// Create a new workflow with a new name
HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
- JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
+ JobDag jobDag =
+ JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
// Set the workflow expiry
- builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+ builder.setExpiry(
+ Long.parseLong(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Expiry.name())));
// Set the schedule, if applicable
ScheduleConfig scheduleConfig;
if (newStartTime != null) {
scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
} else {
- scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields);
+ scheduleConfig = WorkflowConfig.parseScheduleFromConfigMap(wfSimpleFields);
}
if (scheduleConfig != null) {
builder.setScheduleConfig(scheduleConfig);
@@ -699,7 +701,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
- jobSimpleFields.put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), newWorkflowName); // overwrite workflow name
+ jobSimpleFields.put(JobConfig.JobConfigProperty.WorkflowID.name(), newWorkflowName); // overwrite workflow name
for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
builder.addConfig(job, e.getKey(), e.getValue());
}
@@ -746,7 +748,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
long currentTime = now;
Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
- LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
+ LOG.debug(
+ "Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
SCHEDULED_TIMES.remove(jobResource);
}
@@ -831,7 +834,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
private static void markForDeletion(HelixManager mgr, String resourceName) {
mgr.getConfigAccessor().set(
TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
- WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+ WorkflowConfig.WorkflowConfigProperty.TargetState.name(), TargetState.DELETE.name());
}
/**
@@ -848,7 +851,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
- JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+ JobDag jobDag = JobDag
+ .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
for (String child : jobDag.getDirectChildren(resourceName)) {
jobDag.getChildrenToParents().get(child).remove(resourceName);
}
@@ -859,7 +863,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
jobDag.getParentsToChildren().remove(resourceName);
jobDag.getAllNodes().remove(resourceName);
try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
LOG.equals("Could not update DAG for job: " + resourceName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 65a9caf..d423d38 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -44,81 +44,71 @@ public class JobConfig {
/**
* The name of the workflow to which the job belongs.
*/
- WORKFLOW_ID("WorkflowID"),
+ WorkflowID,
/**
* The assignment strategy of this job
*/
- ASSIGNMENT_STRATEGY("AssignmentStrategy"),
+ AssignmentStrategy,
/**
* The name of the target resource.
*/
- TARGET_RESOURCE("TargetResource"),
+ TargetResource,
/**
* The set of the target partition states. The value must be a comma-separated list of partition
* states.
*/
- TARGET_PARTITION_STATES("TargetPartitionStates"),
+ TargetPartitionStates,
/**
* The set of the target partition ids. The value must be a comma-separated list of partition ids.
*/
- TARGET_PARTITIONS("TargetPartitions"),
+ TargetPartitions,
/**
* The command that is to be run by participants in the case of identical tasks.
*/
- COMMAND("Command"),
+ Command,
/**
* The command configuration to be used by the tasks.
*/
- JOB_COMMAND_CONFIG_MAP("JobCommandConfig"),
+ JobCommandConfig,
/**
* The timeout for a task.
*/
- TIMEOUT_PER_TASK("TimeoutPerPartition"),
+ TimeoutPerPartition,
/**
* The maximum number of times the task rebalancer may attempt to execute a task.
*/
- MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"),
+ MaxAttemptsPerTask,
/**
* The maximum number of times Helix will intentionally move a failing task
*/
- MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"),
+ MaxForcedReassignmentsPerTask,
/**
* The number of concurrent tasks that are allowed to run on an instance.
*/
- NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"),
+ ConcurrentTasksPerInstance,
/**
* The number of tasks within the job that are allowed to fail.
*/
- FAILURE_THRESHOLD("FailureThreshold"),
+ FailureThreshold,
/**
* The amount of time in ms to wait before retrying a task
*/
- TASK_RETRY_DELAY("TaskRetryDelay"),
+ TaskRetryDelay,
/**
* Whether failure of directly dependent jobs should fail this job.
*/
- IGNORE_DEPENDENT_JOB_FAILURE("IgnoreDependentJobFailure"),
+ IgnoreDependentJobFailure,
/**
* The individual task configurations, if any *
*/
- TASK_CONFIGS("TaskConfigs"),
+ TaskConfigs,
/**
* Disable external view (not showing) for this job resource
*/
- DISABLE_EXTERNALVIEW("DisableExternalView");
-
- private final String _value;
-
- private JobConfigProperty(String val) {
- _value = val;
- }
-
- public String value() {
- return _value;
- }
+ DisableExternalView
}
//Default property values
@@ -237,40 +227,40 @@ public class JobConfig {
public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow);
+ cfgMap.put(JobConfigProperty.WorkflowID.name(), _workflow);
if (_command != null) {
- cfgMap.put(JobConfigProperty.COMMAND.value(), _command);
+ cfgMap.put(JobConfigProperty.Command.name(), _command);
}
if (_jobCommandConfigMap != null) {
String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
if (serializedConfig != null) {
- cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig);
+ cfgMap.put(JobConfigProperty.JobCommandConfig.name(), serializedConfig);
}
}
if (_targetResource != null) {
- cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource);
+ cfgMap.put(JobConfigProperty.TargetResource.name(), _targetResource);
}
if (_targetPartitionStates != null) {
- cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(),
+ cfgMap.put(JobConfigProperty.TargetPartitionStates.name(),
Joiner.on(",").join(_targetPartitionStates));
}
if (_targetPartitions != null) {
cfgMap
- .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions));
+ .put(JobConfigProperty.TargetPartitions.name(), Joiner.on(",").join(_targetPartitions));
}
if (_retryDelay > 0) {
- cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay);
+ cfgMap.put(JobConfigProperty.TaskRetryDelay.name(), "" + _retryDelay);
}
- cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask);
- cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask);
- cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+ cfgMap.put(JobConfigProperty.TimeoutPerPartition.name(), "" + _timeoutPerTask);
+ cfgMap.put(JobConfigProperty.MaxAttemptsPerTask.name(), "" + _maxAttemptsPerTask);
+ cfgMap.put(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
"" + _maxForcedReassignmentsPerTask);
- cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold);
- cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(),
+ cfgMap.put(JobConfigProperty.FailureThreshold.name(), "" + _failureThreshold);
+ cfgMap.put(JobConfigProperty.DisableExternalView.name(),
Boolean.toString(_disableExternalView));
- cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+ cfgMap.put(JobConfigProperty.ConcurrentTasksPerInstance.name(),
"" + _numConcurrentTasksPerInstance);
- cfgMap.put(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value(),
+ cfgMap.put(JobConfigProperty.IgnoreDependentJobFailure.name(),
Boolean.toString(_ignoreDependentJobFailure));
return cfgMap;
}
@@ -312,56 +302,56 @@ public class JobConfig {
*/
public static Builder fromMap(Map<String, String> cfg) {
Builder b = new Builder();
- if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) {
- b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value()));
+ if (cfg.containsKey(JobConfigProperty.WorkflowID.name())) {
+ b.setWorkflow(cfg.get(JobConfigProperty.WorkflowID.name()));
}
- if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) {
- b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value()));
+ if (cfg.containsKey(JobConfigProperty.TargetResource.name())) {
+ b.setTargetResource(cfg.get(JobConfigProperty.TargetResource.name()));
}
- if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) {
- b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value())));
+ if (cfg.containsKey(JobConfigProperty.TargetPartitions.name())) {
+ b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TargetPartitions.name())));
}
- if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) {
+ if (cfg.containsKey(JobConfigProperty.TargetPartitionStates.name())) {
b.setTargetPartitionStates(new HashSet<String>(
- Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(","))));
+ Arrays.asList(cfg.get(JobConfigProperty.TargetPartitionStates.name()).split(","))));
}
- if (cfg.containsKey(JobConfigProperty.COMMAND.value())) {
- b.setCommand(cfg.get(JobConfigProperty.COMMAND.value()));
+ if (cfg.containsKey(JobConfigProperty.Command.name())) {
+ b.setCommand(cfg.get(JobConfigProperty.Command.name()));
}
- if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) {
+ if (cfg.containsKey(JobConfigProperty.JobCommandConfig.name())) {
Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap(
- cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value()));
+ cfg.get(JobConfigProperty.JobCommandConfig.name()));
b.setJobCommandConfigMap(commandConfigMap);
}
- if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) {
- b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value())));
+ if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) {
+ b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name())));
}
- if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) {
+ if (cfg.containsKey(JobConfigProperty.ConcurrentTasksPerInstance.name())) {
b.setNumConcurrentTasksPerInstance(
- Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())));
+ Integer.parseInt(cfg.get(JobConfigProperty.ConcurrentTasksPerInstance.name())));
}
- if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) {
+ if (cfg.containsKey(JobConfigProperty.MaxAttemptsPerTask.name())) {
b.setMaxAttemptsPerTask(
- Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())));
+ Integer.parseInt(cfg.get(JobConfigProperty.MaxAttemptsPerTask.name())));
}
- if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) {
+ if (cfg.containsKey(JobConfigProperty.MaxForcedReassignmentsPerTask.name())) {
b.setMaxForcedReassignmentsPerTask(
- Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())));
+ Integer.parseInt(cfg.get(JobConfigProperty.MaxForcedReassignmentsPerTask.name())));
}
- if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) {
+ if (cfg.containsKey(JobConfigProperty.FailureThreshold.name())) {
b.setFailureThreshold(
- Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value())));
+ Integer.parseInt(cfg.get(JobConfigProperty.FailureThreshold.name())));
}
- if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) {
- b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value())));
+ if (cfg.containsKey(JobConfigProperty.TaskRetryDelay.name())) {
+ b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TaskRetryDelay.name())));
}
- if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) {
+ if (cfg.containsKey(JobConfigProperty.DisableExternalView.name())) {
b.setDisableExternalView(
- Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
+ Boolean.valueOf(cfg.get(JobConfigProperty.DisableExternalView.name())));
}
- if (cfg.containsKey(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())) {
+ if (cfg.containsKey(JobConfigProperty.IgnoreDependentJobFailure.name())) {
b.setIgnoreDependentJobFailure(
- Boolean.valueOf(cfg.get(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())));
+ Boolean.valueOf(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
}
return b;
}
@@ -453,45 +443,45 @@ public class JobConfig {
private void validate() {
if (_taskConfigMap.isEmpty() && _targetResource == null) {
throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE));
+ String.format("%s cannot be null", JobConfigProperty.TargetResource));
}
if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
.isEmpty()) {
throw new IllegalArgumentException(
- String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES));
+ String.format("%s cannot be an empty set", JobConfigProperty.TargetPartitionStates));
}
if (_taskConfigMap.isEmpty() && _command == null) {
throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.COMMAND));
+ String.format("%s cannot be null", JobConfigProperty.Command));
}
if (_timeoutPerTask < 0) {
throw new IllegalArgumentException(String
- .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK,
+ .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition,
_timeoutPerTask));
}
if (_numConcurrentTasksPerInstance < 1) {
throw new IllegalArgumentException(String
- .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ .format("%s has invalid value %s", JobConfigProperty.ConcurrentTasksPerInstance,
_numConcurrentTasksPerInstance));
}
if (_maxAttemptsPerTask < 1) {
throw new IllegalArgumentException(String
- .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK,
+ .format("%s has invalid value %s", JobConfigProperty.MaxAttemptsPerTask,
_maxAttemptsPerTask));
}
if (_maxForcedReassignmentsPerTask < 0) {
throw new IllegalArgumentException(String
- .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ .format("%s has invalid value %s", JobConfigProperty.MaxForcedReassignmentsPerTask,
_maxForcedReassignmentsPerTask));
}
if (_failureThreshold < 0) {
throw new IllegalArgumentException(String
- .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD,
+ .format("%s has invalid value %s", JobConfigProperty.FailureThreshold,
_failureThreshold));
}
if (_workflow == null) {
throw new IllegalArgumentException(
- String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID));
+ String.format("%s cannot be null", JobConfigProperty.WorkflowID));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 73a5e58..32e1ffa 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -26,7 +26,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -155,6 +154,10 @@ public class JobDag {
}
}
+ public int size() {
+ return _allNodes.size();
+ }
+
/**
* Checks that dag contains no cycles and all nodes are reachable.
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index 0280c88..c350fee 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -29,35 +29,25 @@ import java.util.Map;
* A named queue to which jobs can be added
*/
public class JobQueue extends Workflow {
- /* Config fields */
- public static final String CAPACITY = "CAPACITY";
- private final int _capacity;
-
- private JobQueue(String name, int capacity, WorkflowConfig workflowConfig,
+ private JobQueue(String name, WorkflowConfig workflowConfig,
Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
super(name, workflowConfig, jobConfigs, taskConfigs);
- _capacity = capacity;
- validate();
}
/**
* Determine the number of jobs that this queue can accept before rejecting further jobs
+ * This method is deprecated, please use:
+ * JobQueue.getWorkflowConfig().getCapacity();
* @return queue capacity
*/
+ @Deprecated
public int getCapacity() {
- return _capacity;
- }
-
- public Map<String, String> getResourceConfigMap() throws Exception {
- Map<String, String> cfgMap = _workflowConfig.getResourceConfigMap();
- cfgMap.put(CAPACITY, String.valueOf(_capacity));
- return cfgMap;
+ return _workflowConfig.getCapacity();
}
/** Supports creation of a single queue */
public static class Builder extends Workflow.Builder {
- private int _capacity = Integer.MAX_VALUE;
private List<String> jobs;
public Builder(String name) {
@@ -65,42 +55,74 @@ public class JobQueue extends Workflow {
jobs = new ArrayList<String>();
}
- public Builder expiry(long expiry) {
- _expiry = expiry;
- return this;
- }
-
- public Builder capacity(int capacity) {
- _capacity = capacity;
- return this;
- }
-
+ /**
+ * Do not use this method, use workflowConfigBuilder.setCapacity() instead.
+ * If you set capacity via this method, the number given here
+ * will override capacity number set from other places.
+ * @param capacity
+ * @return
+ */
@Override
- public Builder fromMap(Map<String, String> cfg) {
- super.fromMap(cfg);
- if (cfg.containsKey(CAPACITY)) {
- _capacity = Integer.parseInt(cfg.get(CAPACITY));
- }
+ public Builder setCapacity(int capacity) {
+ super.setCapacity(capacity);
return this;
}
- public void enqueueJob(final String job, JobConfig.Builder jobBuilder) {
- if (jobs.size() >= _capacity) {
- throw new HelixException("Failed to push new job to jobQueue, it is already full");
+ public Builder enqueueJob(final String job, JobConfig.Builder jobBuilder) {
+ if (_workflowConfigBuilder != null) {
+ if (jobs.size() >= _workflowConfigBuilder.getCapacity()) {
+ throw new HelixException("Failed to push new job to jobQueue, it is already full");
+ }
}
- addJobConfig(job, jobBuilder);
+
+ addJob(job, jobBuilder);
if (jobs.size() > 0) {
String previousJob = jobs.get(jobs.size() - 1);
addParentChildDependency(previousJob, job);
}
jobs.add(job);
+ return this;
+ }
+
+ /**
+ * Please use setWorkflowConfigMap() instead.
+ * @param workflowCfgMap
+ * @return
+ */
+ @Override
+ public Builder fromMap(Map<String, String> workflowCfgMap) {
+ return setWorkflowConfigMap(workflowCfgMap);
+ }
+
+ @Override
+ public Builder setWorkflowConfigMap(Map<String, String> workflowCfgMap) {
+ super.setWorkflowConfigMap(workflowCfgMap);
+ return this;
+ }
+
+ @Override
+ public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
+ super.setWorkflowConfig(workflowConfig);
+ return this;
+ }
+
+ @Override
+ public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
+ super.setScheduleConfig(scheduleConfig);
+ return this;
+ }
+
+ @Override
+ public Builder setExpiry(long expiry) {
+ super.setExpiry(expiry);
+ return this;
}
+ @Override
public JobQueue build() {
- WorkflowConfig.Builder builder = buildWorkflowConfig();
- builder.setTerminable(false);
- WorkflowConfig workflowConfig = builder.build();
- return new JobQueue(_name, _capacity, workflowConfig, _jobConfigs, _taskConfigs);
+ buildConfig();
+ _workflowConfigBuilder.setTerminable(false);
+ return new JobQueue(_name, _workflowConfigBuilder.build(), _jobConfigs, _taskConfigs);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c3eb8bd..99bcb62 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -45,7 +45,6 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
@@ -292,7 +291,8 @@ public class TaskDriver {
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
- JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+ JobDag jobDag = JobDag.fromJson(
+ currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
for (String resourceName : toRemove) {
for (String child : jobDag.getDirectChildren(resourceName)) {
jobDag.getChildrenToParents().get(child).remove(resourceName);
@@ -305,7 +305,8 @@ public class TaskDriver {
jobDag.getAllNodes().remove(resourceName);
}
try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
@@ -432,7 +433,8 @@ public class TaskDriver {
return null;
}
// Add the node to the existing DAG
- JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+ JobDag jobDag = JobDag.fromJson(
+ currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Set<String> allNodes = jobDag.getAllNodes();
if (!allNodes.contains(namespacedJobName)) {
LOG.warn(
@@ -458,7 +460,8 @@ public class TaskDriver {
// Save the updated DAG
try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
throw new IllegalStateException(
"Could not remove job " + jobName + " from DAG of queue " + queueName, e);
@@ -509,18 +512,16 @@ public class TaskDriver {
public void enqueueJob(final String queueName, final String jobName,
JobConfig.Builder jobBuilder) {
// Get the job queue config and capacity
- HelixProperty workflowConfig =
- _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName));
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_accessor, queueName);
if (workflowConfig == null) {
- throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+ throw new IllegalArgumentException("Queue " + queueName + " config does not yet exist!");
}
- boolean isTerminable =
- workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+ boolean isTerminable = workflowConfig.isTerminable();
if (isTerminable) {
throw new IllegalArgumentException(queueName + " is not a queue!");
}
- final int capacity =
- workflowConfig.getRecord().getIntField(JobQueue.CAPACITY, Integer.MAX_VALUE);
+
+ final int capacity = workflowConfig.getCapacity();
// Create the job to ensure that it validates
JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
@@ -535,9 +536,10 @@ public class TaskDriver {
@Override
public ZNRecord update(ZNRecord currentData) {
// Add the node to the existing DAG
- JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+ JobDag jobDag = JobDag.fromJson(
+ currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Set<String> allNodes = jobDag.getAllNodes();
- if (allNodes.size() >= capacity) {
+ if (capacity > 0 && allNodes.size() >= capacity) {
throw new IllegalStateException(
"Queue " + queueName + " is at capacity, will not add " + jobName);
}
@@ -561,7 +563,8 @@ public class TaskDriver {
// Save the updated DAG
try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
throw new IllegalStateException("Could not add job " + jobName + " to queue " + queueName,
e);
@@ -689,7 +692,8 @@ public class TaskDriver {
// Only update target state for non-completed workflows
String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
- currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+ currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+ state.name());
} else {
LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 49622f3..513c14e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,14 +20,9 @@ package org.apache.helix.task;
*/
import java.io.IOException;
-import java.text.ParseException;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
@@ -36,7 +31,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -46,7 +40,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -319,34 +312,6 @@ public class TaskUtil {
}
}
- /**
- * Get a ScheduleConfig from a workflow config string map
- *
- * @param cfg the string map
- * @return a ScheduleConfig if one exists, otherwise null
- */
- public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
- // Parse schedule-specific configs, if they exist
- Date startTime = null;
- if (cfg.containsKey(WorkflowConfig.START_TIME)) {
- try {
- startTime = WorkflowConfig.getDefaultDateFormat().parse(cfg.get(WorkflowConfig.START_TIME));
- } catch (ParseException e) {
- LOG.error("Unparseable date " + cfg.get(WorkflowConfig.START_TIME), e);
- return null;
- }
- }
- if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) && cfg
- .containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
- return ScheduleConfig
- .recurringFromDate(startTime, TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
- Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
- } else if (startTime != null) {
- return ScheduleConfig.oneTimeDelayedStart(startTime);
- }
- return null;
- }
-
private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.resourceConfig(resource));
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 1706bec..f3abc2e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.helix.HelixException;
import org.apache.helix.task.beans.JobBean;
import org.apache.helix.task.beans.TaskBean;
import org.apache.helix.task.beans.WorkflowBean;
@@ -87,6 +88,14 @@ public class Workflow {
}
/**
+ * @return Resource configuration key/value map.
+ * @throws HelixException
+ */
+ public Map<String, String> getResourceConfigMap() throws HelixException {
+ return _workflowConfig.getResourceConfigMap();
+ }
+
+ /**
* Parses the YAML description from a file into a {@link Workflow} object.
* @param file An abstract path name to the file containing the workflow description.
* @return A {@link Workflow} object.
@@ -134,7 +143,7 @@ public class Workflow {
private static Workflow parse(Reader reader) throws Exception {
Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
- Builder builder = new Builder(wf.name);
+ Builder workflowBuilder = new Builder(wf.name);
if (wf != null && wf.jobs != null) {
for (JobBean job : wf.jobs) {
@@ -144,53 +153,55 @@ public class Workflow {
if (job.parents != null) {
for (String parent : job.parents) {
- builder.addParentChildDependency(parent, job.name);
+ workflowBuilder.addParentChildDependency(parent, job.name);
}
}
- builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name);
- builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command);
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.WorkflowID.name(), wf.name);
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.Command.name(), job.command);
if (job.jobConfigMap != null) {
- builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
+ workflowBuilder.addJobCommandConfigMap(job.name, job.jobConfigMap);
}
- builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(),
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetResource.name(),
job.targetResource);
if (job.targetPartitionStates != null) {
- builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(),
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitionStates.name(),
Joiner.on(",").join(job.targetPartitionStates));
}
if (job.targetPartitions != null) {
- builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(),
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitions.name(),
Joiner.on(",").join(job.targetPartitions));
}
- builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(),
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.MaxAttemptsPerTask.name(),
String.valueOf(job.maxAttemptsPerTask));
- builder.addConfig(job.name,
- JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+ workflowBuilder.addConfig(job.name,
+ JobConfig.JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
String.valueOf(job.maxForcedReassignmentsPerTask));
- builder.addConfig(job.name,
- JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+ workflowBuilder.addConfig(job.name,
+ JobConfig.JobConfigProperty.ConcurrentTasksPerInstance.name(),
String.valueOf(job.numConcurrentTasksPerInstance));
- builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(),
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TimeoutPerPartition.name(),
String.valueOf(job.timeoutPerPartition));
- builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(),
+ workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.FailureThreshold.name(),
String.valueOf(job.failureThreshold));
if (job.tasks != null) {
List<TaskConfig> taskConfigs = Lists.newArrayList();
for (TaskBean task : job.tasks) {
taskConfigs.add(TaskConfig.from(task));
}
- builder.addTaskConfigs(job.name, taskConfigs);
+ workflowBuilder.addTaskConfigs(job.name, taskConfigs);
}
}
}
+ WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
if (wf.schedule != null) {
- builder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
+ workflowCfgBuilder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
}
- builder.setExpiry(wf.expiry);
+ workflowCfgBuilder.setExpiry(wf.expiry);
+ workflowBuilder.setWorkflowConfig(workflowCfgBuilder.build());
- return builder.build();
+ return workflowBuilder.build();
}
/**
@@ -212,6 +223,13 @@ public class Workflow {
", names in dag but not in config: " + jobNamesInDagButNotInConfig);
}
+ int capacity = _workflowConfig.getCapacity();
+ int dagSize = _workflowConfig.getJobDag().size();
+ if (capacity > 0 && dagSize > capacity) {
+ throw new IllegalArgumentException(String.format(
+ "Failed to build workflow %s, number of jobs are more than its capacity! capacity(%d), jobs(%d)",
+ _name, capacity, dagSize));
+ }
_workflowConfig.getJobDag().validate();
for (String node : _jobConfigs.keySet()) {
@@ -234,17 +252,13 @@ public class Workflow {
protected JobDag _dag;
protected Map<String, Map<String, String>> _jobConfigs;
protected Map<String, List<TaskConfig>> _taskConfigs;
- protected ScheduleConfig _scheduleConfig;
- protected long _expiry = -1;
- protected Map<String, String> _cfgMap;
- protected int _parallelJobs = -1;
+ protected WorkflowConfig.Builder _workflowConfigBuilder;
public Builder(String name) {
_name = name;
_dag = new JobDag();
_jobConfigs = new TreeMap<String, Map<String, String>>();
_taskConfigs = new TreeMap<String, List<TaskConfig>>();
- _expiry = -1;
}
protected Builder addConfig(String job, String key, String val) {
@@ -258,11 +272,22 @@ public class Workflow {
}
private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
- return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(),
+ return addConfig(job, JobConfig.JobConfigProperty.JobCommandConfig.name(),
TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
}
+ /**
+ * Please use addJob() instead.
+ * @param job
+ * @param jobConfigBuilder
+ * @return
+ */
+ @Deprecated
public Builder addJobConfig(String job, JobConfig.Builder jobConfigBuilder) {
+ return addJob(job, jobConfigBuilder);
+ }
+
+ public Builder addJob(String job, JobConfig.Builder jobConfigBuilder) {
JobConfig jobConfig = jobConfigBuilder.setWorkflow(_name).build();
for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
String key = e.getKey();
@@ -294,62 +319,73 @@ public class Workflow {
return this;
}
- public Builder fromMap(Map<String, String> cfg) {
- _cfgMap = cfg;
+ /**
+ * Please use setWorkflowConfigMap() instead.
+ * @param workflowCfgMap
+ * @return
+ */
+ public Builder fromMap(Map<String, String> workflowCfgMap) {
+ return setWorkflowConfigMap(workflowCfgMap);
+ }
+
+ public Builder setWorkflowConfigMap(Map<String, String> workflowCfgMap) {
+ if (workflowCfgMap != null && !workflowCfgMap.isEmpty()) {
+ if (_workflowConfigBuilder == null) {
+ _workflowConfigBuilder = WorkflowConfig.Builder.fromMap(workflowCfgMap);
+ } else {
+ _workflowConfigBuilder.setConfigMap(workflowCfgMap);
+ }
+ }
+ return this;
+ }
+
+ public Builder setWorkflowConfig(WorkflowConfig workflowConfig) {
+ _workflowConfigBuilder = new WorkflowConfig.Builder(workflowConfig);
return this;
}
public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
- _scheduleConfig = scheduleConfig;
+ if (_workflowConfigBuilder == null) {
+ _workflowConfigBuilder = new WorkflowConfig.Builder();
+ }
+ _workflowConfigBuilder.setScheduleConfig(scheduleConfig);
return this;
}
public Builder setExpiry(long expiry) {
- _expiry = expiry;
+ if (_workflowConfigBuilder == null) {
+ _workflowConfigBuilder = new WorkflowConfig.Builder();
+ }
+ _workflowConfigBuilder.setExpiry(expiry);
return this;
}
- public String namespacify(String job) {
- return TaskUtil.getNamespacedJobName(_name, job);
+ @Deprecated
+ public Builder setCapacity(int capacity) {
+ if (_workflowConfigBuilder == null) {
+ _workflowConfigBuilder = new WorkflowConfig.Builder();
+ }
+ _workflowConfigBuilder.setCapacity(capacity);
+ return this;
}
- public Builder parallelJobs(int parallelJobs) {
- _parallelJobs = parallelJobs;
- return this;
+ public String namespacify(String job) {
+ return TaskUtil.getNamespacedJobName(_name, job);
}
public Workflow build() {
- WorkflowConfig.Builder builder = buildWorkflowConfig();
- // calls validate internally
- return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs);
+ buildConfig();
+ return new Workflow(_name, _workflowConfigBuilder.build(), _jobConfigs, _taskConfigs);
}
- protected WorkflowConfig.Builder buildWorkflowConfig() {
+ protected void buildConfig() {
for (String task : _jobConfigs.keySet()) {
- // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
- _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name);
- }
-
- WorkflowConfig.Builder builder;
- if (_cfgMap != null) {
- builder = WorkflowConfig.Builder.fromMap(_cfgMap);
- } else {
- builder = new WorkflowConfig.Builder();
+ _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WorkflowID.name(), _name);
}
-
- builder.setJobDag(_dag);
- builder.setTargetState(TargetState.START);
- if (_scheduleConfig != null) {
- builder.setScheduleConfig(_scheduleConfig);
+ if (_workflowConfigBuilder == null) {
+ _workflowConfigBuilder = new WorkflowConfig.Builder();
}
- if (_expiry > 0) {
- builder.setExpiry(_expiry);
- }
- if (_parallelJobs > 0) {
- builder.setParallelJobs(_parallelJobs);
- }
-
- return builder;
+ _workflowConfigBuilder.setJobDag(_dag);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 955cb77..db9fdba 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
* under the License.
*/
import java.io.IOException;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
@@ -26,21 +27,34 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
/**
* Provides a typed interface to workflow level configurations. Validates the configurations.
*/
public class WorkflowConfig {
- /* Config fields */
- public static final String DAG = "Dag";
- public static final String PARALLEL_JOBS = "ParallelJobs";
- public static final String TARGET_STATE = "TargetState";
- public static final String EXPIRY = "Expiry";
- public static final String START_TIME = "StartTime";
- public static final String RECURRENCE_UNIT = "RecurrenceUnit";
- public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
- public static final String TERMINABLE = "Terminable";
- public static final String FAILURE_THRESHOLD = "FailureThreshold";
+ private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
+
+ /**
+ * Do not use these values directly, always use the getters/setters
+ * in WorkflowConfig and WorkflowConfig.Builder.
+ *
+ * For back-compatible, this class will be left for public for a while,
+ * but it will be change to protected in future major release.
+ */
+ public enum WorkflowConfigProperty {
+ Dag,
+ ParallelJobs,
+ TargetState,
+ Expiry,
+ StartTime,
+ RecurrenceUnit,
+ RecurrenceInterval,
+ Terminable,
+ FailureThreshold,
+ /* this is for non-terminable workflow. */
+ capacity
+ }
/* Default values */
public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
@@ -59,9 +73,10 @@ public class WorkflowConfig {
private final boolean _terminable;
private final ScheduleConfig _scheduleConfig;
private final int _failureThreshold;
+ private final int _capacity;
protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long expiry,
- int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig) {
+ int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity) {
_jobDag = jobDag;
_parallelJobs = parallelJobs;
_targetState = targetState;
@@ -69,6 +84,7 @@ public class WorkflowConfig {
_failureThreshold = failureThreshold;
_terminable = terminable;
_scheduleConfig = scheduleConfig;
+ _capacity = capacity;
}
public JobDag getJobDag() {
@@ -91,6 +107,13 @@ public class WorkflowConfig {
return _failureThreshold;
}
+ /**
+ * Determine the number of jobs that this workflow can accept before rejecting further jobs,
+ * this field is only used when a workflow is not terminable.
+ * @return queue capacity
+ */
+ public int getCapacity() { return _capacity; }
+
public boolean isTerminable() {
return _terminable;
}
@@ -128,15 +151,20 @@ public class WorkflowConfig {
public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
try {
- cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+ cfgMap.put(WorkflowConfigProperty.Dag.name(), getJobDag().toJson());
} catch (IOException ex) {
throw new HelixException("Invalid job dag configuration!", ex);
}
- cfgMap.put(WorkflowConfig.PARALLEL_JOBS, String.valueOf(getParallelJobs()));
- cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
- cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
- cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable()));
- cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(getFailureThreshold()));
+ cfgMap.put(WorkflowConfigProperty.ParallelJobs.name(), String.valueOf(getParallelJobs()));
+ cfgMap.put(WorkflowConfigProperty.Expiry.name(), String.valueOf(getExpiry()));
+ cfgMap.put(WorkflowConfigProperty.TargetState.name(), getTargetState().name());
+ cfgMap.put(WorkflowConfigProperty.Terminable.name(), String.valueOf(isTerminable()));
+ cfgMap.put(WorkflowConfigProperty.FailureThreshold.name(),
+ String.valueOf(getFailureThreshold()));
+
+ if (_capacity > 0) {
+ cfgMap.put(WorkflowConfigProperty.capacity.name(), String.valueOf(_capacity));
+ }
// Populate schedule if present
ScheduleConfig scheduleConfig = getScheduleConfig();
@@ -144,17 +172,50 @@ public class WorkflowConfig {
Date startTime = scheduleConfig.getStartTime();
if (startTime != null) {
String formattedTime = WorkflowConfig.getDefaultDateFormat().format(startTime);
- cfgMap.put(WorkflowConfig.START_TIME, formattedTime);
+ cfgMap.put(WorkflowConfigProperty.StartTime.name(), formattedTime);
}
if (scheduleConfig.isRecurring()) {
- cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString());
- cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval()
- .toString());
+ cfgMap.put(WorkflowConfigProperty.RecurrenceUnit.name(),
+ scheduleConfig.getRecurrenceUnit().toString());
+ cfgMap.put(WorkflowConfigProperty.RecurrenceInterval.name(),
+ scheduleConfig.getRecurrenceInterval().toString());
}
}
return cfgMap;
}
+ /**
+ * Get a ScheduleConfig from a workflow config string map
+ *
+ * @param cfg the string map
+ * @return a ScheduleConfig if one exists, otherwise null
+ */
+ public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
+ // Parse schedule-specific configs, if they exist
+ Date startTime = null;
+ if (cfg.containsKey(WorkflowConfigProperty.StartTime.name())) {
+ try {
+ startTime = WorkflowConfig.getDefaultDateFormat()
+ .parse(cfg.get(WorkflowConfigProperty.StartTime.name()));
+ } catch (ParseException e) {
+ LOG.error(
+ "Unparseable date " + cfg.get(WorkflowConfigProperty.StartTime.name()),
+ e);
+ return null;
+ }
+ }
+ if (cfg.containsKey(WorkflowConfigProperty.RecurrenceUnit.name()) && cfg
+ .containsKey(WorkflowConfigProperty.RecurrenceInterval.name())) {
+ return ScheduleConfig.recurringFromDate(startTime,
+ TimeUnit.valueOf(cfg.get(WorkflowConfigProperty.RecurrenceUnit.name())),
+ Long.parseLong(
+ cfg.get(WorkflowConfigProperty.RecurrenceInterval.name())));
+ } else if (startTime != null) {
+ return ScheduleConfig.oneTimeDelayedStart(startTime);
+ }
+ return null;
+ }
+
public static class Builder {
private JobDag _taskDag = JobDag.EMPTY_DAG;
private int _parallelJobs = 1;
@@ -162,13 +223,14 @@ public class WorkflowConfig {
private long _expiry = DEFAULT_EXPIRY;
private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
private boolean _isTerminable = true;
+ private int _capacity = Integer.MAX_VALUE;
private ScheduleConfig _scheduleConfig;
public WorkflowConfig build() {
validate();
return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold,
- _isTerminable, _scheduleConfig);
+ _isTerminable, _scheduleConfig, _capacity);
}
public Builder() {}
@@ -180,9 +242,11 @@ public class WorkflowConfig {
_expiry = workflowConfig.getExpiry();
_isTerminable = workflowConfig.isTerminable();
_scheduleConfig = workflowConfig.getScheduleConfig();
+ _capacity = workflowConfig.getCapacity();
+ _failureThreshold = workflowConfig.getFailureThreshold();
}
- public Builder setJobDag(JobDag v) {
+ protected Builder setJobDag(JobDag v) {
_taskDag = v;
return this;
}
@@ -207,6 +271,11 @@ public class WorkflowConfig {
return this;
}
+ public Builder setCapacity(int capacity) {
+ _capacity = capacity;
+ return this;
+ }
+
public Builder setTerminable(boolean isTerminable) {
_isTerminable = isTerminable;
return this;
@@ -223,43 +292,95 @@ public class WorkflowConfig {
}
public static Builder fromMap(Map<String, String> cfg) {
- Builder b = new Builder();
- if (cfg.containsKey(EXPIRY)) {
- b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
+ Builder builder = new Builder();
+ builder.setConfigMap(cfg);
+ return builder;
+ }
+
+ public Builder setConfigMap(Map<String, String> cfg) {
+ if (cfg.containsKey(WorkflowConfigProperty.Expiry.name())) {
+ setExpiry(Long.parseLong(cfg.get(WorkflowConfigProperty.Expiry.name())));
}
- if (cfg.containsKey(FAILURE_THRESHOLD)) {
- b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+ if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
+ setFailureThreshold(
+ Integer.parseInt(cfg.get(WorkflowConfigProperty.FailureThreshold.name())));
}
- if (cfg.containsKey(DAG)) {
- b.setJobDag(JobDag.fromJson(cfg.get(DAG)));
+ if (cfg.containsKey(WorkflowConfigProperty.Dag.name())) {
+ setJobDag(JobDag.fromJson(cfg.get(WorkflowConfigProperty.Dag.name())));
}
- if (cfg.containsKey(TARGET_STATE)) {
- b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
+ if (cfg.containsKey(WorkflowConfigProperty.TargetState.name())) {
+ setTargetState(TargetState.valueOf(cfg.get(WorkflowConfigProperty.TargetState.name())));
}
- if (cfg.containsKey(TERMINABLE)) {
- b.setTerminable(Boolean.parseBoolean(cfg.get(TERMINABLE)));
+ if (cfg.containsKey(WorkflowConfigProperty.Terminable.name())) {
+ setTerminable(Boolean.parseBoolean(cfg.get(WorkflowConfigProperty.Terminable.name())));
}
- if (cfg.containsKey(PARALLEL_JOBS)) {
- String value = cfg.get(PARALLEL_JOBS);
+ if (cfg.containsKey(WorkflowConfigProperty.ParallelJobs.name())) {
+ String value = cfg.get(WorkflowConfigProperty.ParallelJobs.name());
if (value == null) {
- b.setParallelJobs(1);
+ setParallelJobs(1);
} else {
- b.setParallelJobs(Integer.parseInt(value));
+ setParallelJobs(Integer.parseInt(value));
+ }
+ }
+
+ if (cfg.containsKey(WorkflowConfigProperty.capacity.name())) {
+ int capacity = Integer.valueOf(cfg.get(WorkflowConfigProperty.capacity.name()));
+ if (capacity > 0) {
+ setCapacity(capacity);
+ }
+ }
+
+ if (cfg.containsKey(WorkflowConfigProperty.FailureThreshold.name())) {
+ int threshold = Integer.valueOf(cfg.get(WorkflowConfigProperty.FailureThreshold.name()));
+ if (threshold >= 0) {
+ setFailureThreshold(threshold);
}
}
// Parse schedule-specific configs, if they exist
- ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg);
+ ScheduleConfig scheduleConfig = parseScheduleFromConfigMap(cfg);
if (scheduleConfig != null) {
- b.setScheduleConfig(scheduleConfig);
+ setScheduleConfig(scheduleConfig);
}
- return b;
+ return this;
+ }
+
+ public int getParallelJobs() {
+ return _parallelJobs;
+ }
+
+ public TargetState getTargetState() {
+ return _targetState;
+ }
+
+ public long getExpiry() {
+ return _expiry;
+ }
+
+ public int getFailureThreshold() {
+ return _failureThreshold;
+ }
+
+ public boolean isTerminable() {
+ return _isTerminable;
+ }
+
+ public int getCapacity() {
+ return _capacity;
+ }
+
+ public ScheduleConfig getScheduleConfig() {
+ return _scheduleConfig;
+ }
+
+ public JobDag getJobDag() {
+ return _taskDag;
}
private void validate() {
if (_expiry < 0) {
- throw new IllegalArgumentException(
- String.format("%s has invalid value %s", EXPIRY, _expiry));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", WorkflowConfigProperty.Expiry.name(), _expiry));
} else if (_scheduleConfig != null && !_scheduleConfig.isValid()) {
throw new IllegalArgumentException(
"Scheduler configuration is invalid. The configuration must have a start time if it is "
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 682ac77..9d1106a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -75,11 +75,6 @@ public class WorkflowRebalancer extends TaskRebalancer {
if (targetState == TargetState.STOP) {
LOG.info("Workflow " + workflow + "is marked as stopped.");
- // Workflow has been stopped if all jobs are stopped
- // TODO: what should we do if workflowCtx is not set yet?
- if (workflowCtx != null && isWorkflowStopped(workflowCtx, workflowCfg)) {
- workflowCtx.setWorkflowState(TaskState.STOPPED);
- }
return buildEmptyAssignment(workflow, currStateOutput);
}
@@ -339,25 +334,24 @@ public class WorkflowRebalancer extends TaskRebalancer {
}
// Create a new workflow with a new name
- HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
- Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
- JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
- Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
- Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
-
- // Set the workflow expiry
- workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+ Map<String, String> workflowConfigsMap =
+ resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
+ WorkflowConfig.Builder workflowConfigBlder = WorkflowConfig.Builder.fromMap(workflowConfigsMap);
// Set the schedule, if applicable
- ScheduleConfig scheduleConfig;
if (newStartTime != null) {
- scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
- } else {
- scheduleConfig = TaskUtil.parseScheduleFromConfigMap(wfSimpleFields);
- }
- if (scheduleConfig != null) {
- workflowBuilder.setScheduleConfig(scheduleConfig);
+ ScheduleConfig scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+ workflowConfigBlder.setScheduleConfig(scheduleConfig);
}
+ workflowConfigBlder.setTerminable(true);
+
+ WorkflowConfig workflowConfig = workflowConfigBlder.build();
+
+ JobDag jobDag = workflowConfig.getJobDag();
+ Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+
+ Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
+ workflowBuilder.setWorkflowConfig(workflowConfig);
// Add each job back as long as the original exists
Set<String> namespacedJobs = jobDag.getAllNodes();
@@ -454,10 +448,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
// Remove DAG references in workflow
PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
+ @Override public ZNRecord update(ZNRecord currentData) {
if (currentData != null) {
- JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+ JobDag jobDag = JobDag.fromJson(
+ currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
for (String child : jobDag.getDirectChildren(job)) {
jobDag.getChildrenToParents().get(child).remove(job);
}
@@ -468,7 +462,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
jobDag.getParentsToChildren().remove(job);
jobDag.getAllNodes().remove(job);
try {
- currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+ currentData
+ .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
LOG.error("Could not update DAG for job: " + job, e);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 422ec88..3e5385c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -22,20 +22,19 @@ import java.util.Arrays;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
-import org.apache.helix.HelixManager;
import org.apache.helix.TestHelper;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.testng.Assert;
@@ -121,8 +120,7 @@ public class TaskTestUtil {
final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
- @Override
- public boolean verify() throws Exception {
+ @Override public boolean verify() throws Exception {
WorkflowContext ctx = driver.getWorkflowContext(workflowName);
return ctx == null || ctx.getJobState(namespacedJobName) == null;
}
@@ -242,36 +240,46 @@ public class TaskTestUtil {
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
int recurrenInSeconds) {
- Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
- cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(recurrenInSeconds));
- cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
+ return buildRecurrentJobQueue(jobQueueName, delayStart, recurrenInSeconds, null);
+ }
+
+ public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
+ int recurrenInSeconds, TargetState targetState) {
+ WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
+ workflowCfgBuilder.setExpiry(120000);
+ if (targetState != null) {
+ workflowCfgBuilder.setTargetState(TargetState.STOP);
+ }
+
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
cal.set(Calendar.MILLISECOND, 0);
- cfgMap.put(WorkflowConfig.START_TIME,
- WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
- return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+ ScheduleConfig scheduleConfig =
+ ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenInSeconds);
+ workflowCfgBuilder.setScheduleConfig(scheduleConfig);
+ return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
}
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
return buildRecurrentJobQueue(jobQueueName, 0);
}
- public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart, int failureThreshold) {
- Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+ public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
+ int failureThreshold) {
+ WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
+ workflowCfgBuilder.setExpiry(120000);
+
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
cal.set(Calendar.MILLISECOND, 0);
- cfgMap.put(WorkflowConfig.START_TIME,
- WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+ workflowCfgBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(cal.getTime()));
+
if (failureThreshold > 0) {
- cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(failureThreshold));
+ workflowCfgBuilder.setFailureThreshold(failureThreshold);
}
- return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+ return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
}
public static JobQueue.Builder buildJobQueue(String jobQueueName) {
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index ae3d52d..b2e61ca 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -40,14 +40,13 @@ import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
@@ -237,11 +236,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
for (int i = 0; i <= 4; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
- JobConfig.Builder job =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setJobCommandConfigMap(commandConfig)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
LOG.info("Enqueuing job: " + jobName);
queueBuilder.enqueueJob(jobName, job);
@@ -331,10 +328,9 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
for (int i = 0; i < JOB_COUNTS; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
- JobConfig.Builder job =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(commandConfig)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
jobs.add(job);
jobNames.add(targetPartition.toLowerCase() + "Job" + i);
}
@@ -370,6 +366,42 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
}
@Test
+ public void testCreateStoppedQueue() throws InterruptedException {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000,
+ TargetState.STOP);
+ // Create and Enqueue jobs
+ List<String> currentJobNames = new ArrayList<String>();
+ for (int i = 0; i <= 1; i++) {
+ String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ String jobName = targetPartition.toLowerCase() + "Job" + i;
+ queueBuild.enqueueJob(jobName, jobConfig);
+ currentJobNames.add(jobName);
+ }
+
+ _driver.createQueue(queueBuild.build());
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
+ Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+ _driver.resume(queueName);
+
+ //TaskTestUtil.pollForWorkflowState(_driver, queueName, );
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+
+ // ensure current schedule is started
+ String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+ }
+
+ @Test
public void testGetNoExistWorkflowConfig() {
String randomName = "randomJob";
WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName);
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 580f5ac..b091748 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -41,6 +41,7 @@ import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -136,14 +137,16 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
}
}
- @Test
- public void test() throws Exception {
+ @Test public void test() throws Exception {
final int PARALLEL_COUNT = 2;
String queueName = TestHelper.getTestMethodName();
- JobQueue.Builder queueBuild = new JobQueue.Builder(queueName);
- queueBuild.parallelJobs(PARALLEL_COUNT);
+ WorkflowConfig.Builder cfgBuilder = new WorkflowConfig.Builder();
+ cfgBuilder.setParallelJobs(PARALLEL_COUNT);
+
+ JobQueue.Builder queueBuild =
+ new JobQueue.Builder(queueName).setWorkflowConfig(cfgBuilder.build());
JobQueue queue = queueBuild.build();
_driver.createQueue(queue);
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
index 964f9e1..2e53b36 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -34,6 +34,7 @@ import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskDriver;
@@ -50,10 +51,8 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -63,7 +62,6 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
private static final int START_PORT = 12918;
private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
private static final String TIMEOUT_CONFIG = "Timeout";
- private static final String TGT_DB = "TestDB";
private static final int NUM_PARTITIONS = 20;
private static final int NUM_REPLICAS = 3;
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -92,8 +90,9 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
}
// Set up target db
- setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
- setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+ setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+ MASTER_SLAVE_STATE_MODEL);
+ setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
@@ -151,27 +150,13 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
}
@Test
- public void testUpdateQueueConfig() throws InterruptedException {
+ public void testUpdateRunningQueue() throws InterruptedException {
String queueName = TestHelper.getTestMethodName();
// Create a queue
LOG.info("Starting job-queue: " + queueName);
- JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000);
- // Create and Enqueue jobs
- List<String> currentJobNames = new ArrayList<String>();
- for (int i = 0; i <= 1; i++) {
- String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
-
- JobConfig.Builder jobConfig =
- new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
- .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
- .setTargetPartitionStates(Sets.newHashSet(targetPartition));
- String jobName = targetPartition.toLowerCase() + "Job" + i;
- queueBuild.enqueueJob(jobName, jobConfig);
- currentJobNames.add(jobName);
- }
-
- _driver.start(queueBuild.build());
+ JobQueue queue = createDefaultRecurrentJobQueue(queueName, 2);
+ _driver.start(queue);
WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
@@ -209,5 +194,75 @@ public class TestUpdateWorkflow extends ZkIntegrationTestBase {
startTime.get(Calendar.MINUTE) == configStartTime.get(Calendar.MINUTE) &&
startTime.get(Calendar.SECOND) == configStartTime.get(Calendar.SECOND)));
}
+
+ @Test
+ public void testUpdateStoppedQueue() throws InterruptedException {
+ String queueName = TestHelper.getTestMethodName();
+
+ // Create a queue
+ LOG.info("Starting job-queue: " + queueName);
+ JobQueue queue = createDefaultRecurrentJobQueue(queueName, 2);
+ _driver.start(queue);
+
+ WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+
+ // ensure current schedule is started
+ String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.IN_PROGRESS);
+
+ _driver.stop(queueName);
+
+ WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName);
+ Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+ WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowConfig);
+ Calendar startTime = Calendar.getInstance();
+ startTime.set(Calendar.SECOND, startTime.get(Calendar.SECOND) + 1);
+
+ ScheduleConfig scheduleConfig =
+ ScheduleConfig.recurringFromDate(startTime.getTime(), TimeUnit.MINUTES, 2);
+
+ configBuilder.setScheduleConfig(scheduleConfig);
+
+ _driver.updateWorkflow(queueName, configBuilder.build());
+
+ workflowConfig = _driver.getWorkflowConfig(queueName);
+ Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP);
+
+ _driver.resume(queueName);
+
+ // ensure current schedule is completed
+ TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+
+ Thread.sleep(1000);
+
+ wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+ scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+ WorkflowConfig wCfg = _driver.getWorkflowConfig(scheduledQueue);
+
+ Calendar configStartTime = Calendar.getInstance();
+ configStartTime.setTime(wCfg.getStartTime());
+
+ Assert.assertTrue(
+ (startTime.get(Calendar.HOUR_OF_DAY) == configStartTime.get(Calendar.HOUR_OF_DAY) &&
+ startTime.get(Calendar.MINUTE) == configStartTime.get(Calendar.MINUTE) &&
+ startTime.get(Calendar.SECOND) == configStartTime.get(Calendar.SECOND)));
+ }
+
+ private JobQueue createDefaultRecurrentJobQueue(String queueName, int numJobs) {
+ JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000);
+ for (int i = 0; i <= numJobs; i++) {
+ String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+ JobConfig.Builder jobConfig =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+ String jobName = targetPartition.toLowerCase() + "Job" + i;
+ queueBuild.enqueueJob(jobName, jobConfig);
+ }
+
+ return queueBuild.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d386aff3/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index ce3a36a..639cdff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -31,8 +31,6 @@ import org.apache.log4j.Logger;
* Convenience class for generating various test workflows
*/
public class WorkflowGenerator {
- private static final Logger LOG = Logger.getLogger(WorkflowGenerator.class);
-
public static final String DEFAULT_TGT_DB = "TestDB";
public static final String JOB_NAME_1 = "SomeJob1";
public static final String JOB_NAME_2 = "SomeJob2";
@@ -54,13 +52,6 @@ public class WorkflowGenerator {
DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
}
- private static final JobConfig.Builder DEFAULT_JOB_BUILDER;
- static {
- JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
- builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
- DEFAULT_JOB_BUILDER = builder;
- }
-
public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);