You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/06/16 23:36:11 UTC
[2/4] helix git commit: [HELIX-623] Do not expose internal
configuration field name. Client should use JobConfig.Builder to create
jobConfig.
[HELIX-623] Do not expose internal configuration field name. Client should use JobConfig.Builder to create jobConfig.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79c490fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79c490fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79c490fa
Branch: refs/heads/helix-0.6.x
Commit: 79c490fab080494b68a1f52845c1e708b8881439
Parents: 2409601
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Feb 10 15:59:37 2016 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700
----------------------------------------------------------------------
.../org/apache/helix/model/ResourceConfig.java | 61 +++++
.../java/org/apache/helix/task/JobConfig.java | 258 +++++++++++--------
.../java/org/apache/helix/task/TaskDriver.java | 1 +
.../java/org/apache/helix/task/TaskUtil.java | 21 +-
.../java/org/apache/helix/task/Workflow.java | 37 +--
.../task/TestIndependentTaskRebalancer.java | 93 +++----
.../integration/task/TestRecurringJobQueue.java | 4 +-
.../integration/task/TestTaskRebalancer.java | 39 +--
.../task/TestTaskRebalancerRetryLimit.java | 18 +-
.../task/TestTaskRebalancerStopResume.java | 10 +-
.../integration/task/WorkflowGenerator.java | 56 ++--
11 files changed, 354 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 98433f5..d58126d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -23,6 +23,9 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
+import java.util.Collections;
+import java.util.Map;
+
/**
* Resource configurations
*/
@@ -73,6 +76,64 @@ public class ResourceConfig extends HelixProperty {
.setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), monitoringDisabled);
}
+ /**
+ * Put a set of simple configs.
+ *
+ * @param configsMap
+ */
+ public void putSimpleConfigs(Map<String, String> configsMap) {
+ getRecord().getSimpleFields().putAll(configsMap);
+ }
+
+ /**
+ * Get all simple configurations.
+ *
+ * @return all simple configurations.
+ */
+ public Map<String, String> getSimpleConfigs() {
+ return Collections.unmodifiableMap(getRecord().getSimpleFields());
+ }
+
+ /**
+ * Put a single simple config value.
+ *
+ * @param configKey
+ * @param configVal
+ */
+ public void putSimpleConfig(String configKey, String configVal) {
+ getRecord().getSimpleFields().put(configKey, configVal);
+ }
+
+ /**
+ * Get a single simple config value.
+ *
+ * @param configKey
+ * @return configuration value, or NULL if not exist.
+ */
+ public String getSimpleConfig(String configKey) {
+ return getRecord().getSimpleFields().get(configKey);
+ }
+
+ /**
+ * Put a single map config.
+ *
+ * @param configKey
+ * @param configValMap
+ */
+ public void putMapConfig(String configKey, Map<String, String> configValMap) {
+ getRecord().setMapField(configKey, configValMap);
+ }
+
+ /**
+ * Get a single map config.
+ *
+ * @param configKey
+ * @return configuration value map, or NULL if not exist.
+ */
+ public Map<String, String> getMapConfig(String configKey) {
+ return getRecord().getMapField(configKey);
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceConfig) {
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index c7c2f38..37a2f35 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -36,49 +36,87 @@ import com.google.common.collect.Maps;
* Provides a typed interface to job configurations.
*/
public class JobConfig {
- // // Property names ////
-
- /** The name of the workflow to which the job belongs. */
- public static final String WORKFLOW_ID = "WorkflowID";
- /** The assignment strategy of this job */
- public static final String ASSIGNMENT_STRATEGY = "AssignmentStrategy";
- /** The name of the target resource. */
- public static final String TARGET_RESOURCE = "TargetResource";
- /**
- * The set of the target partition states. The value must be a comma-separated list of partition
- * states.
- */
- public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
+
/**
- * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+ * Do not use this value directly, always use the get/set methods in JobConfig and JobConfig.Builder.
*/
- public static final String TARGET_PARTITIONS = "TargetPartitions";
- /** The command that is to be run by participants in the case of identical tasks. */
- public static final String COMMAND = "Command";
- /** The command configuration to be used by the tasks. */
- public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig";
- /** The timeout for a task. */
- public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
- /** The maximum number of times the task rebalancer may attempt to execute a task. */
- public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask";
- /** The maximum number of times Helix will intentionally move a failing task */
- public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask";
- /** The number of concurrent tasks that are allowed to run on an instance. */
- public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
- /** The number of tasks within the job that are allowed to fail. */
- public static final String FAILURE_THRESHOLD = "FailureThreshold";
- /** The amount of time in ms to wait before retrying a task */
- public static final String TASK_RETRY_DELAY = "TaskRetryDelay";
-
- /** The individual task configurations, if any **/
- public static final String TASK_CONFIGS = "TaskConfigs";
-
- /** Disable external view (not showing) for this job resource */
- public static final String DISABLE_EXTERNALVIEW = "DisableExternalView";
-
-
- // // Default property values ////
+ protected enum JobConfigProperty {
+ /**
+ * The name of the workflow to which the job belongs.
+ */
+ WORKFLOW_ID("WorkflowID"),
+ /**
+ * The assignment strategy of this job
+ */
+ ASSIGNMENT_STRATEGY("AssignmentStrategy"),
+ /**
+ * The name of the target resource.
+ */
+ TARGET_RESOURCE("TargetResource"),
+ /**
+ * The set of the target partition states. The value must be a comma-separated list of partition
+ * states.
+ */
+ TARGET_PARTITION_STATES("TargetPartitionStates"),
+ /**
+ * The set of the target partition ids. The value must be a comma-separated list of partition ids.
+ */
+ TARGET_PARTITIONS("TargetPartitions"),
+ /**
+ * The command that is to be run by participants in the case of identical tasks.
+ */
+ COMMAND("Command"),
+ /**
+ * The command configuration to be used by the tasks.
+ */
+ JOB_COMMAND_CONFIG_MAP("JobCommandConfig"),
+ /**
+ * The timeout for a task.
+ */
+ TIMEOUT_PER_TASK("TimeoutPerPartition"),
+ /**
+ * The maximum number of times the task rebalancer may attempt to execute a task.
+ */
+ MAX_ATTEMPTS_PER_TASK("MaxAttemptsPerTask"),
+ /**
+ * The maximum number of times Helix will intentionally move a failing task
+ */
+ MAX_FORCED_REASSIGNMENTS_PER_TASK("MaxForcedReassignmentsPerTask"),
+ /**
+ * The number of concurrent tasks that are allowed to run on an instance.
+ */
+ NUM_CONCURRENT_TASKS_PER_INSTANCE("ConcurrentTasksPerInstance"),
+ /**
+ * The number of tasks within the job that are allowed to fail.
+ */
+ FAILURE_THRESHOLD("FailureThreshold"),
+ /**
+ * The amount of time in ms to wait before retrying a task
+ */
+ TASK_RETRY_DELAY("TaskRetryDelay"),
+
+ /**
+ * The individual task configurations, if any *
+ */
+ TASK_CONFIGS("TaskConfigs"),
+
+ /**
+ * Disable external view (not showing) for this job resource
+ */
+ DISABLE_EXTERNALVIEW("DisableExternalView");
+
+ private final String _value;
+
+ private JobConfigProperty(String val) {
+ _value = val;
+ }
+
+ public String value() {
+ return _value;
+ }
+ }
+ //Default property values
public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -106,8 +144,7 @@ public class JobConfig {
Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
- boolean disableExternalView,
- Map<String, TaskConfig> taskConfigMap) {
+ boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -190,34 +227,39 @@ public class JobConfig {
public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
- cfgMap.put(JobConfig.WORKFLOW_ID, _workflow);
+ cfgMap.put(JobConfigProperty.WORKFLOW_ID.value(), _workflow);
if (_command != null) {
- cfgMap.put(JobConfig.COMMAND, _command);
+ cfgMap.put(JobConfigProperty.COMMAND.value(), _command);
}
if (_jobCommandConfigMap != null) {
String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
if (serializedConfig != null) {
- cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig);
+ cfgMap.put(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(), serializedConfig);
}
}
if (_targetResource != null) {
- cfgMap.put(JobConfig.TARGET_RESOURCE, _targetResource);
+ cfgMap.put(JobConfigProperty.TARGET_RESOURCE.value(), _targetResource);
}
if (_targetPartitionStates != null) {
- cfgMap.put(JobConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+ cfgMap.put(JobConfigProperty.TARGET_PARTITION_STATES.value(),
+ Joiner.on(",").join(_targetPartitionStates));
}
if (_targetPartitions != null) {
- cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+ cfgMap
+ .put(JobConfigProperty.TARGET_PARTITIONS.value(), Joiner.on(",").join(_targetPartitions));
}
if (_retryDelay > 0) {
- cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay);
+ cfgMap.put(JobConfigProperty.TASK_RETRY_DELAY.value(), "" + _retryDelay);
}
- cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask);
- cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask);
- cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
- cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
- cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
- cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
+ cfgMap.put(JobConfigProperty.TIMEOUT_PER_TASK.value(), "" + _timeoutPerTask);
+ cfgMap.put(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(), "" + _maxAttemptsPerTask);
+ cfgMap.put(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
+ "" + _maxForcedReassignmentsPerTask);
+ cfgMap.put(JobConfigProperty.FAILURE_THRESHOLD.value(), "" + _failureThreshold);
+ cfgMap.put(JobConfigProperty.DISABLE_EXTERNALVIEW.value(),
+ Boolean.toString(_disableExternalView));
+ cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
+ "" + _numConcurrentTasksPerInstance);
return cfgMap;
}
@@ -251,54 +293,58 @@ public class JobConfig {
/**
* Convenience method to build a {@link JobConfig} from a {@code Map<String, String>}.
+ *
* @param cfg A map of property names to their string representations.
* @return A {@link Builder}.
*/
public static Builder fromMap(Map<String, String> cfg) {
Builder b = new Builder();
- if (cfg.containsKey(WORKFLOW_ID)) {
- b.setWorkflow(cfg.get(WORKFLOW_ID));
+ if (cfg.containsKey(JobConfigProperty.WORKFLOW_ID.value())) {
+ b.setWorkflow(cfg.get(JobConfigProperty.WORKFLOW_ID.value()));
}
- if (cfg.containsKey(TARGET_RESOURCE)) {
- b.setTargetResource(cfg.get(TARGET_RESOURCE));
+ if (cfg.containsKey(JobConfigProperty.TARGET_RESOURCE.value())) {
+ b.setTargetResource(cfg.get(JobConfigProperty.TARGET_RESOURCE.value()));
}
- if (cfg.containsKey(TARGET_PARTITIONS)) {
- b.setTargetPartitions(csvToStringList(cfg.get(TARGET_PARTITIONS)));
+ if (cfg.containsKey(JobConfigProperty.TARGET_PARTITIONS.value())) {
+ b.setTargetPartitions(csvToStringList(cfg.get(JobConfigProperty.TARGET_PARTITIONS.value())));
}
- if (cfg.containsKey(TARGET_PARTITION_STATES)) {
- b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
- TARGET_PARTITION_STATES).split(","))));
+ if (cfg.containsKey(JobConfigProperty.TARGET_PARTITION_STATES.value())) {
+ b.setTargetPartitionStates(new HashSet<String>(
+ Arrays.asList(cfg.get(JobConfigProperty.TARGET_PARTITION_STATES.value()).split(","))));
}
- if (cfg.containsKey(COMMAND)) {
- b.setCommand(cfg.get(COMMAND));
+ if (cfg.containsKey(JobConfigProperty.COMMAND.value())) {
+ b.setCommand(cfg.get(JobConfigProperty.COMMAND.value()));
}
- if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) {
- Map<String, String> commandConfigMap =
- TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP));
+ if (cfg.containsKey(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value())) {
+ Map<String, String> commandConfigMap = TaskUtil.deserializeJobCommandConfigMap(
+ cfg.get(JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value()));
b.setJobCommandConfigMap(commandConfigMap);
}
- if (cfg.containsKey(TIMEOUT_PER_TASK)) {
- b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
+ if (cfg.containsKey(JobConfigProperty.TIMEOUT_PER_TASK.value())) {
+ b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TIMEOUT_PER_TASK.value())));
}
- if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
- b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
- .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
+ if (cfg.containsKey(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())) {
+ b.setNumConcurrentTasksPerInstance(
+ Integer.parseInt(cfg.get(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value())));
}
- if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) {
- b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK)));
+ if (cfg.containsKey(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())) {
+ b.setMaxAttemptsPerTask(
+ Integer.parseInt(cfg.get(JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value())));
}
- if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) {
- b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg
- .get(MAX_FORCED_REASSIGNMENTS_PER_TASK)));
+ if (cfg.containsKey(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())) {
+ b.setMaxForcedReassignmentsPerTask(
+ Integer.parseInt(cfg.get(JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value())));
}
- if (cfg.containsKey(FAILURE_THRESHOLD)) {
- b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+ if (cfg.containsKey(JobConfigProperty.FAILURE_THRESHOLD.value())) {
+ b.setFailureThreshold(
+ Integer.parseInt(cfg.get(JobConfigProperty.FAILURE_THRESHOLD.value())));
}
- if (cfg.containsKey(TASK_RETRY_DELAY)) {
- b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY)));
+ if (cfg.containsKey(JobConfigProperty.TASK_RETRY_DELAY.value())) {
+ b.setTaskRetryDelay(Long.parseLong(cfg.get(JobConfigProperty.TASK_RETRY_DELAY.value())));
}
- if (cfg.containsKey(DISABLE_EXTERNALVIEW)) {
- b.setDisableExternalView(Boolean.valueOf(cfg.get(DISABLE_EXTERNALVIEW)));
+ if (cfg.containsKey(JobConfigProperty.DISABLE_EXTERNALVIEW.value())) {
+ b.setDisableExternalView(
+ Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
}
return b;
}
@@ -384,38 +430,46 @@ public class JobConfig {
private void validate() {
if (_taskConfigMap.isEmpty() && _targetResource == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.TARGET_RESOURCE));
}
- if (_taskConfigMap.isEmpty() && _targetPartitionStates != null
- && _targetPartitionStates.isEmpty()) {
- throw new IllegalArgumentException(String.format("%s cannot be an empty set",
- TARGET_PARTITION_STATES));
+ if (_taskConfigMap.isEmpty() && _targetPartitionStates != null && _targetPartitionStates
+ .isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("%s cannot be an empty set", JobConfigProperty.TARGET_PARTITION_STATES));
}
if (_taskConfigMap.isEmpty() && _command == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.COMMAND));
}
if (_timeoutPerTask < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- TIMEOUT_PER_TASK, _timeoutPerTask));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.TIMEOUT_PER_TASK,
+ _timeoutPerTask));
}
if (_numConcurrentTasksPerInstance < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ _numConcurrentTasksPerInstance));
}
if (_maxAttemptsPerTask < 1) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.MAX_ATTEMPTS_PER_TASK,
+ _maxAttemptsPerTask));
}
if (_maxForcedReassignmentsPerTask < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ _maxForcedReassignmentsPerTask));
}
if (_failureThreshold < 0) {
- throw new IllegalArgumentException(String.format("%s has invalid value %s",
- FAILURE_THRESHOLD, _failureThreshold));
+ throw new IllegalArgumentException(String
+ .format("%s has invalid value %s", JobConfigProperty.FAILURE_THRESHOLD,
+ _failureThreshold));
}
if (_workflow == null) {
- throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
+ throw new IllegalArgumentException(
+ String.format("%s cannot be null", JobConfigProperty.WORKFLOW_ID));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 9b64aec..c4986ee 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -55,6 +55,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d804fab..524b889 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -402,10 +402,10 @@ public class TaskUtil {
Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
- Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+ Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
// Set the workflow expiry
- builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+ workflowBuilder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
// Set the schedule, if applicable
ScheduleConfig scheduleConfig;
@@ -415,7 +415,7 @@ public class TaskUtil {
scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
}
if (scheduleConfig != null) {
- builder.setScheduleConfig(scheduleConfig);
+ workflowBuilder.setScheduleConfig(scheduleConfig);
}
// Add each job back as long as the original exists
@@ -426,29 +426,30 @@ public class TaskUtil {
String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
- jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
- for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
- builder.addConfig(job, e.getKey(), e.getValue());
- }
+
+ JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
+
+ jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
taskConfigs.add(taskConfig);
}
- builder.addTaskConfigs(job, taskConfigs);
+ jobCfgBuilder.addTaskConfigs(taskConfigs);
+ workflowBuilder.addJobConfig(job, jobCfgBuilder);
// Add dag dependencies
Set<String> children = parentsToChildren.get(namespacedJob);
if (children != null) {
for (String namespacedChild : children) {
String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
- builder.addParentChildDependency(job, child);
+ workflowBuilder.addParentChildDependency(job, child);
}
}
}
}
- return builder.build();
+ return workflowBuilder.build();
}
private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 8ea2691..3a050c2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -128,7 +128,9 @@ public class Workflow {
return parse(new StringReader(yaml));
}
- /** Helper function to parse workflow from a generic {@link Reader} */
+ /**
+ * Helper function to parse workflow from a generic {@link Reader}
+ */
private static Workflow parse(Reader reader) throws Exception {
Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
@@ -146,29 +148,32 @@ public class Workflow {
}
}
- builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
- builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.WORKFLOW_ID.value(), wf.name);
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.COMMAND.value(), job.command);
if (job.jobConfigMap != null) {
builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
}
- builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_RESOURCE.value(),
+ job.targetResource);
if (job.targetPartitionStates != null) {
- builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITION_STATES.value(),
Joiner.on(",").join(job.targetPartitionStates));
}
if (job.targetPartitions != null) {
- builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TARGET_PARTITIONS.value(),
Joiner.on(",").join(job.targetPartitions));
}
- builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.MAX_ATTEMPTS_PER_TASK.value(),
String.valueOf(job.maxAttemptsPerTask));
- builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ builder.addConfig(job.name,
+ JobConfig.JobConfigProperty.MAX_FORCED_REASSIGNMENTS_PER_TASK.value(),
String.valueOf(job.maxForcedReassignmentsPerTask));
- builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ builder.addConfig(job.name,
+ JobConfig.JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
String.valueOf(job.numConcurrentTasksPerInstance));
- builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.TIMEOUT_PER_TASK.value(),
String.valueOf(job.timeoutPerPartition));
- builder.addConfig(job.name, JobConfig.FAILURE_THRESHOLD,
+ builder.addConfig(job.name, JobConfig.JobConfigProperty.FAILURE_THRESHOLD.value(),
String.valueOf(job.failureThreshold));
if (job.tasks != null) {
List<TaskConfig> taskConfigs = Lists.newArrayList();
@@ -242,7 +247,7 @@ public class Workflow {
_expiry = -1;
}
- public Builder addConfig(String job, String key, String val) {
+ private Builder addConfig(String job, String key, String val) {
job = namespacify(job);
_dag.addNode(job);
if (!_jobConfigs.containsKey(job)) {
@@ -252,8 +257,8 @@ public class Workflow {
return this;
}
- public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
- return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP,
+ private Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap) {
+ return addConfig(job, JobConfig.JobConfigProperty.JOB_COMMAND_CONFIG_MAP.value(),
TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
}
@@ -268,7 +273,7 @@ public class Workflow {
return this;
}
- public Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
+ private Builder addTaskConfigs(String job, Collection<TaskConfig> taskConfigs) {
job = namespacify(job);
_dag.addNode(job);
if (!_taskConfigs.containsKey(job)) {
@@ -322,7 +327,7 @@ public class Workflow {
protected WorkflowConfig.Builder buildWorkflowConfig() {
for (String task : _jobConfigs.keySet()) {
// addConfig(task, TaskConfig.WORKFLOW_ID, _name);
- _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
+ _jobConfigs.get(task).put(JobConfig.JobConfigProperty.WORKFLOW_ID.value(), _name);
}
WorkflowConfig.Builder builder;
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index a00a736..40c2485 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -140,8 +140,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
_runCounts.clear();
}
- @Test
- public void testDifferentTasks() throws Exception {
+ @Test public void testDifferentTasks() throws Exception {
// Create a job with two different tasks
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -150,11 +149,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, true);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -166,8 +166,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test
- public void testThresholdFailure() throws Exception {
+ @Test public void testThresholdFailure() throws Exception {
// Create a job with two different tasks
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -177,12 +176,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1)
+ .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -194,8 +193,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test
- public void testOptionalTaskFailure() throws Exception {
+ @Test public void testOptionalTaskFailure() throws Exception {
// Create a job with two different tasks
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
@@ -205,11 +203,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null, false);
taskConfigs.add(taskConfig1);
taskConfigs.add(taskConfig2);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -221,24 +222,23 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
}
- @Test
- public void testReassignment() throws Exception {
+ @Test public void testReassignment() throws Exception {
final int NUM_INSTANCES = 2;
String jobName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
- Map<String, String> taskConfigMap =
- Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_'
- + START_PORT));
+ Map<String, String> taskConfigMap = Maps.newHashMap(
+ ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + START_PORT));
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
taskConfigs.add(taskConfig1);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, ""
- + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+ .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -251,8 +251,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
// Ensure that this was tried on two different instances, the first of which exhausted the
// attempts number, and the other passes on the first try
Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
- Assert.assertTrue(_runCounts.values().contains(
- JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
+ Assert.assertTrue(
+ _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
Assert.assertTrue(_runCounts.values().contains(1));
}
@@ -264,11 +264,14 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Map<String, String> taskConfigMap = Maps.newHashMap();
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
taskConfigs.add(taskConfig1);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- Map<String, String> jobConfigMap = Maps.newHashMap();
- jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+ jobCommandMap.put("Timeout", "1000");
+
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+ .addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
_driver.start(workflowBuilder.build());
@@ -295,11 +298,13 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
Map<String, String> taskConfigMap = Maps.newHashMap();
TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false);
taskConfigs.add(taskConfig1);
- workflowBuilder.addTaskConfigs(jobName, taskConfigs);
- workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
- workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay));
- Map<String, String> jobConfigMap = Maps.newHashMap();
- workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
+ Map<String, String> jobCommandMap = Maps.newHashMap();
+
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+ .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap);
+ workflowBuilder.addJobConfig(jobName, jobBuilder);
+
SingleFailTask.hasFailed = false;
_driver.start(workflowBuilder.build());
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 79adcd5..da13ada 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -215,12 +215,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
for (int i = 0; i <= 1; i++) {
String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
- JobConfig.Builder job =
+ JobConfig.Builder jobConfig =
new JobConfig.Builder().setCommand("Reindex")
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
.setTargetPartitionStates(Sets.newHashSet(targetPartition));
String jobName = targetPartition.toLowerCase() + "Job" + i;
- queueBuild.enqueueJob(jobName, job);
+ queueBuild.enqueueJob(jobName, jobConfig);
currentJobNames.add(jobName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f402b82..3352d1c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -163,9 +163,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
String jobName = "Expiry";
long expiry = 1000;
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
- Workflow flow =
- WorkflowGenerator
- .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig);
+
+ Workflow flow = WorkflowGenerator
+ .generateSingleJobWorkflowBuilder(jobName, jobBuilder)
.setExpiry(expiry).build();
_driver.start(flow);
@@ -204,9 +206,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
final String jobResource = "basic" + jobCompletionTime;
Map<String, String> commandConfig =
ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- commandConfig).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// Wait for job completion
@@ -220,18 +225,20 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
}
}
- @Test
- public void partitionSet() throws Exception {
+ @Test public void partitionSet() throws Exception {
final String jobResource = "partitionSet";
ImmutableList<String> targetPartitions =
ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
// construct and submit our basic workflow
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig).setMaxAttemptsPerTask(1)
+ .setTargetPartitions(targetPartitions);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
- JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// wait for job completeness/timeout
@@ -268,13 +275,15 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
}
}
- @Test
- public void timeouts() throws Exception {
+ @Test public void timeouts() throws Exception {
final String jobResource = "timeouts";
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+ .setMaxAttemptsPerTask(2).setTimeoutPerTask(100);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
- String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
// Wait until the job reports failure.
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index b678d7e..efe90b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -125,18 +125,15 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
_manager.disconnect();
}
- @Test
- public void test() throws Exception {
+ @Test public void test() throws Exception {
String jobResource = TestHelper.getTestMethodName();
+
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
+ .setMaxAttemptsPerTask(2).setCommand("ErrorTask").setFailureThreshold(Integer.MAX_VALUE);
+
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
- WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
- String.valueOf(2)).build();
- Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs();
- for (Map<String, String> jobConfig : jobConfigs.values()) {
- jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
- jobConfig.put(JobConfig.COMMAND, "ErrorTask");
- }
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
_driver.start(flow);
@@ -151,7 +148,6 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2);
}
}
-
}
private static class ErrorTask implements Task {
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 8a44672..7437b72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -162,12 +162,14 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
_manager.disconnect();
}
- @Test
- public void stopAndResume() throws Exception {
+ @Test public void stopAndResume() throws Exception {
Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+
+ JobConfig.Builder jobBuilder =
+ JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(commandConfig);
Workflow flow =
- WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
- commandConfig).build();
+ WorkflowGenerator.generateSingleJobWorkflowBuilder(JOB_RESOURCE, jobBuilder).build();
LOG.info("Starting flow " + flow.getName());
_driver.start(flow);
http://git-wip-us.apache.org/repos/asf/helix/blob/79c490fa/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index a414f5c..23c35af 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -56,58 +56,34 @@ public class WorkflowGenerator {
DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
}
- public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
- String jobName, Map<String, String> commandConfig, String... cfgs) {
- if (cfgs.length % 2 != 0) {
- throw new IllegalArgumentException(
- "Additional configs should have even number of keys and values");
- }
- Workflow.Builder bldr = generateSingleJobWorkflowBuilder(jobName, commandConfig, DEFAULT_JOB_CONFIG);
- for (int i = 0; i < cfgs.length; i += 2) {
- bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
- }
-
- return bldr;
+ private static final JobConfig.Builder DEFAULT_JOB_BUILDER;
+ static {
+ JobConfig.Builder builder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+ builder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+ DEFAULT_JOB_BUILDER = builder;
}
public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
- return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+ return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
}
public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
- Map<String, String> commandConfig, Map<String, String> config) {
- Workflow.Builder builder = new Workflow.Builder(jobName);
- for (String key : config.keySet()) {
- builder.addConfig(jobName, key, config.get(key));
- }
- if (commandConfig != null) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- String serializedMap = mapper.writeValueAsString(commandConfig);
- builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
- } catch (IOException e) {
- LOG.error("Error serializing " + commandConfig, e);
- }
- }
- return builder;
+ JobConfig.Builder jobBuilder) {
+ return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder);
}
public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) {
Workflow.Builder builder = new Workflow.Builder(workflowName);
builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
- for (String key : DEFAULT_JOB_CONFIG.keySet()) {
- builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
- builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
- }
- ObjectMapper mapper = new ObjectMapper();
- try {
- String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
- builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
- builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
- } catch (IOException e) {
- LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
- }
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
+ jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+ builder.addJobConfig(JOB_NAME_1, jobBuilder);
+ builder.addJobConfig(JOB_NAME_2, jobBuilder);
+
return builder;
}
}