You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:11 UTC
[15/33] helix git commit: More cleanup on workflow and workflowConfig
builders.
More cleanup on workflow and workflowConfig builders.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1f683b86
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1f683b86
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1f683b86
Branch: refs/heads/helix-0.6.x
Commit: 1f683b863df23f16bd893fc675f88ed8b7f3d3b8
Parents: b6b89de
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Mar 30 13:59:59 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:58:18 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 40 ++++++++++++++++
.../java/org/apache/helix/task/Workflow.java | 50 ++------------------
.../org/apache/helix/task/WorkflowConfig.java | 11 +++++
.../apache/helix/task/WorkflowRebalancer.java | 2 +-
.../org/apache/helix/task/beans/JobBean.java | 4 +-
.../task/TestIndependentTaskRebalancer.java | 12 ++---
.../integration/task/WorkflowGenerator.java | 4 +-
7 files changed, 67 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index d423d38..4d5aa94 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -30,7 +30,10 @@ import java.util.Set;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.helix.task.beans.JobBean;
+import org.apache.helix.task.beans.TaskBean;
/**
* Provides a typed interface to job configurations.
@@ -485,6 +488,43 @@ public class JobConfig {
}
}
+ public static Builder from(JobBean jobBean) {
+ Builder b = new Builder();
+
+ b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
+ .setMaxForcedReassignmentsPerTask(jobBean.maxForcedReassignmentsPerTask)
+ .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
+ .setTimeoutPerTask(jobBean.timeoutPerPartition)
+ .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
+ .setDisableExternalView(jobBean.disableExternalView)
+ .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure);
+
+ if (jobBean.jobCommandConfigMap != null) {
+ b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
+ }
+ if (jobBean.command != null) {
+ b.setCommand(jobBean.command);
+ }
+ if (jobBean.targetResource != null) {
+ b.setTargetResource(jobBean.targetResource);
+ }
+ if (jobBean.targetPartitionStates != null) {
+ b.setTargetPartitionStates(new HashSet<String>(jobBean.targetPartitionStates));
+ }
+ if (jobBean.targetPartitions != null) {
+ b.setTargetPartitions(jobBean.targetPartitions);
+ }
+ if (jobBean.tasks != null) {
+ List<TaskConfig> taskConfigs = Lists.newArrayList();
+ for (TaskBean task : jobBean.tasks) {
+ taskConfigs.add(TaskConfig.Builder.from(task));
+ }
+ b.addTaskConfigs(taskConfigs);
+ }
+
+ return b;
+ }
+
private static List<String> csvToStringList(String csv) {
String[] vals = csv.split(",");
return Arrays.asList(vals);
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index e077f47..a7060c3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -34,14 +34,10 @@ import java.util.TreeMap;
import org.apache.helix.HelixException;
import org.apache.helix.task.beans.JobBean;
-import org.apache.helix.task.beans.TaskBean;
import org.apache.helix.task.beans.WorkflowBean;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
/**
* Houses a job dag and config set to fully describe a job workflow
*/
@@ -150,56 +146,18 @@ public class Workflow {
if (job.name == null) {
throw new IllegalArgumentException("A job must have a name.");
}
-
+ JobConfig.Builder jobConfigBuilder = JobConfig.Builder.from(job);
+ jobConfigBuilder.setWorkflow(wf.name);
+ workflowBuilder.addJob(job.name, jobConfigBuilder);
if (job.parents != null) {
for (String parent : job.parents) {
workflowBuilder.addParentChildDependency(parent, job.name);
}
}
-
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.WorkflowID.name(), wf.name);
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.Command.name(), job.command);
- if (job.jobConfigMap != null) {
- workflowBuilder.addJobCommandConfigMap(job.name, job.jobConfigMap);
- }
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetResource.name(),
- job.targetResource);
- if (job.targetPartitionStates != null) {
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitionStates.name(),
- Joiner.on(",").join(job.targetPartitionStates));
- }
- if (job.targetPartitions != null) {
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitions.name(),
- Joiner.on(",").join(job.targetPartitions));
- }
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.MaxAttemptsPerTask.name(),
- String.valueOf(job.maxAttemptsPerTask));
- workflowBuilder.addConfig(job.name,
- JobConfig.JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
- String.valueOf(job.maxForcedReassignmentsPerTask));
- workflowBuilder.addConfig(job.name,
- JobConfig.JobConfigProperty.ConcurrentTasksPerInstance.name(),
- String.valueOf(job.numConcurrentTasksPerInstance));
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TimeoutPerPartition.name(),
- String.valueOf(job.timeoutPerPartition));
- workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.FailureThreshold.name(),
- String.valueOf(job.failureThreshold));
- if (job.tasks != null) {
- List<TaskConfig> taskConfigs = Lists.newArrayList();
- for (TaskBean task : job.tasks) {
- taskConfigs.add(TaskConfig.Builder.from(task));
- }
- workflowBuilder.addTaskConfigs(job.name, taskConfigs);
- }
}
}
- WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
- if (wf.schedule != null) {
- workflowCfgBuilder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
- }
- workflowCfgBuilder.setExpiry(wf.expiry);
- workflowBuilder.setWorkflowConfig(workflowCfgBuilder.build());
+ workflowBuilder.setWorkflowConfig(WorkflowConfig.Builder.from(wf).build());
return workflowBuilder.build();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index db9fdba..844bdf0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixException;
+import org.apache.helix.task.beans.WorkflowBean;
import org.apache.log4j.Logger;
/**
@@ -377,6 +378,16 @@ public class WorkflowConfig {
return _taskDag;
}
+ public static Builder from(WorkflowBean workflowBean) {
+ WorkflowConfig.Builder b = new WorkflowConfig.Builder();
+ if (workflowBean.schedule != null) {
+ b.setScheduleConfig(ScheduleConfig.from(workflowBean.schedule));
+ }
+ b.setExpiry(workflowBean.expiry);
+
+ return b;
+ }
+
private void validate() {
if (_expiry < 0) {
throw new IllegalArgumentException(String
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 8f97cce..2d4ca75 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -372,7 +372,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
taskConfigs.add(taskConfig);
}
jobCfgBuilder.addTaskConfigs(taskConfigs);
- workflowBuilder.addJobConfig(job, jobCfgBuilder);
+ workflowBuilder.addJob(job, jobCfgBuilder);
// Add dag dependencies
Set<String> children = parentsToChildren.get(namespacedJob);
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 32fd5ac..a570026 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -34,7 +34,7 @@ public class JobBean {
public List<String> targetPartitionStates;
public List<String> targetPartitions;
public String command;
- public Map<String, String> jobConfigMap;
+ public Map<String, String> jobCommandConfigMap;
public List<TaskBean> tasks;
public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
@@ -42,4 +42,6 @@ public class JobBean {
public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
+ public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW;
+ public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 1c58776..046281e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -152,7 +152,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJobConfig(jobName, jobBuilder);
+ workflowBuilder.addJob(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -179,7 +179,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1)
.addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap);
- workflowBuilder.addJobConfig(jobName, jobBuilder);
+ workflowBuilder.addJob(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -207,7 +207,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
JobConfig.Builder jobBuilder =
new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJobConfig(jobName, jobBuilder);
+ workflowBuilder.addJob(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
@@ -235,7 +235,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
.setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJobConfig(jobName, jobBuilder);
+ workflowBuilder.addJob(jobName, jobBuilder);
_driver.start(workflowBuilder.build());
@@ -268,7 +268,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
.addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJobConfig(jobName, jobBuilder);
+ workflowBuilder.addJob(jobName, jobBuilder);
long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
@@ -301,7 +301,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
.setTaskRetryDelay(delay).addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJobConfig(jobName, jobBuilder);
+ workflowBuilder.addJob(jobName, jobBuilder);
SingleFailTask.hasFailed = false;
_driver.start(workflowBuilder.build());
http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 639cdff..d428460 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -70,8 +70,8 @@ public class WorkflowGenerator {
JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
- builder.addJobConfig(JOB_NAME_1, jobBuilder);
- builder.addJobConfig(JOB_NAME_2, jobBuilder);
+ builder.addJob(JOB_NAME_1, jobBuilder);
+ builder.addJob(JOB_NAME_2, jobBuilder);
return builder;
}