You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:11 UTC

[15/33] helix git commit: More cleanup on workflow and workflowConfig builders.

More cleanup on workflow and workflowConfig builders.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1f683b86
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1f683b86
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1f683b86

Branch: refs/heads/helix-0.6.x
Commit: 1f683b863df23f16bd893fc675f88ed8b7f3d3b8
Parents: b6b89de
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Mar 30 13:59:59 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Jul 5 14:58:18 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 40 ++++++++++++++++
 .../java/org/apache/helix/task/Workflow.java    | 50 ++------------------
 .../org/apache/helix/task/WorkflowConfig.java   | 11 +++++
 .../apache/helix/task/WorkflowRebalancer.java   |  2 +-
 .../org/apache/helix/task/beans/JobBean.java    |  4 +-
 .../task/TestIndependentTaskRebalancer.java     | 12 ++---
 .../integration/task/WorkflowGenerator.java     |  4 +-
 7 files changed, 67 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index d423d38..4d5aa94 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -30,7 +30,10 @@ import java.util.Set;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.helix.task.beans.JobBean;
+import org.apache.helix.task.beans.TaskBean;
 
 /**
  * Provides a typed interface to job configurations.
@@ -485,6 +488,43 @@ public class JobConfig {
       }
     }
 
+    public static Builder from(JobBean jobBean) {
+      Builder b = new Builder();
+
+      b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
+          .setMaxForcedReassignmentsPerTask(jobBean.maxForcedReassignmentsPerTask)
+          .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
+          .setTimeoutPerTask(jobBean.timeoutPerPartition)
+          .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
+          .setDisableExternalView(jobBean.disableExternalView)
+          .setIgnoreDependentJobFailure(jobBean.ignoreDependentJobFailure);
+
+      if (jobBean.jobCommandConfigMap != null) {
+        b.setJobCommandConfigMap(jobBean.jobCommandConfigMap);
+      }
+      if (jobBean.command != null) {
+        b.setCommand(jobBean.command);
+      }
+      if (jobBean.targetResource != null) {
+        b.setTargetResource(jobBean.targetResource);
+      }
+      if (jobBean.targetPartitionStates != null) {
+        b.setTargetPartitionStates(new HashSet<String>(jobBean.targetPartitionStates));
+      }
+      if (jobBean.targetPartitions != null) {
+        b.setTargetPartitions(jobBean.targetPartitions);
+      }
+      if (jobBean.tasks != null) {
+        List<TaskConfig> taskConfigs = Lists.newArrayList();
+        for (TaskBean task : jobBean.tasks) {
+          taskConfigs.add(TaskConfig.Builder.from(task));
+        }
+        b.addTaskConfigs(taskConfigs);
+      }
+
+      return b;
+    }
+
     private static List<String> csvToStringList(String csv) {
       String[] vals = csv.split(",");
       return Arrays.asList(vals);

http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index e077f47..a7060c3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -34,14 +34,10 @@ import java.util.TreeMap;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.task.beans.JobBean;
-import org.apache.helix.task.beans.TaskBean;
 import org.apache.helix.task.beans.WorkflowBean;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
 /**
  * Houses a job dag and config set to fully describe a job workflow
  */
@@ -150,56 +146,18 @@ public class Workflow {
         if (job.name == null) {
           throw new IllegalArgumentException("A job must have a name.");
         }
-
+        JobConfig.Builder jobConfigBuilder = JobConfig.Builder.from(job);
+        jobConfigBuilder.setWorkflow(wf.name);
+        workflowBuilder.addJob(job.name, jobConfigBuilder);
         if (job.parents != null) {
           for (String parent : job.parents) {
             workflowBuilder.addParentChildDependency(parent, job.name);
           }
         }
-
-        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.WorkflowID.name(), wf.name);
-        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.Command.name(), job.command);
-        if (job.jobConfigMap != null) {
-          workflowBuilder.addJobCommandConfigMap(job.name, job.jobConfigMap);
-        }
-        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetResource.name(),
-            job.targetResource);
-        if (job.targetPartitionStates != null) {
-          workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitionStates.name(),
-              Joiner.on(",").join(job.targetPartitionStates));
-        }
-        if (job.targetPartitions != null) {
-          workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TargetPartitions.name(),
-              Joiner.on(",").join(job.targetPartitions));
-        }
-        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.MaxAttemptsPerTask.name(),
-            String.valueOf(job.maxAttemptsPerTask));
-        workflowBuilder.addConfig(job.name,
-            JobConfig.JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
-            String.valueOf(job.maxForcedReassignmentsPerTask));
-        workflowBuilder.addConfig(job.name,
-            JobConfig.JobConfigProperty.ConcurrentTasksPerInstance.name(),
-            String.valueOf(job.numConcurrentTasksPerInstance));
-        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.TimeoutPerPartition.name(),
-            String.valueOf(job.timeoutPerPartition));
-        workflowBuilder.addConfig(job.name, JobConfig.JobConfigProperty.FailureThreshold.name(),
-            String.valueOf(job.failureThreshold));
-        if (job.tasks != null) {
-          List<TaskConfig> taskConfigs = Lists.newArrayList();
-          for (TaskBean task : job.tasks) {
-            taskConfigs.add(TaskConfig.Builder.from(task));
-          }
-          workflowBuilder.addTaskConfigs(job.name, taskConfigs);
-        }
       }
     }
 
-    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder();
-    if (wf.schedule != null) {
-      workflowCfgBuilder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
-    }
-    workflowCfgBuilder.setExpiry(wf.expiry);
-    workflowBuilder.setWorkflowConfig(workflowCfgBuilder.build());
+    workflowBuilder.setWorkflowConfig(WorkflowConfig.Builder.from(wf).build());
 
     return workflowBuilder.build();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index db9fdba..844bdf0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixException;
+import org.apache.helix.task.beans.WorkflowBean;
 import org.apache.log4j.Logger;
 
 /**
@@ -377,6 +378,16 @@ public class WorkflowConfig {
       return _taskDag;
     }
 
+    public static Builder from(WorkflowBean workflowBean) {
+      WorkflowConfig.Builder b = new WorkflowConfig.Builder();
+      if (workflowBean.schedule != null) {
+        b.setScheduleConfig(ScheduleConfig.from(workflowBean.schedule));
+      }
+      b.setExpiry(workflowBean.expiry);
+
+      return b;
+    }
+
     private void validate() {
       if (_expiry < 0) {
         throw new IllegalArgumentException(String

http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 8f97cce..2d4ca75 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -372,7 +372,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
           taskConfigs.add(taskConfig);
         }
         jobCfgBuilder.addTaskConfigs(taskConfigs);
-        workflowBuilder.addJobConfig(job, jobCfgBuilder);
+        workflowBuilder.addJob(job, jobCfgBuilder);
 
         // Add dag dependencies
         Set<String> children = parentsToChildren.get(namespacedJob);

http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 32fd5ac..a570026 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -34,7 +34,7 @@ public class JobBean {
   public List<String> targetPartitionStates;
   public List<String> targetPartitions;
   public String command;
-  public Map<String, String> jobConfigMap;
+  public Map<String, String> jobCommandConfigMap;
   public List<TaskBean> tasks;
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
@@ -42,4 +42,6 @@ public class JobBean {
   public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK;
   public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD;
   public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY;
+  public boolean disableExternalView = JobConfig.DEFAULT_DISABLE_EXTERNALVIEW;
+  public boolean ignoreDependentJobFailure = JobConfig.DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 1c58776..046281e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -152,7 +152,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
             .setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJobConfig(jobName, jobBuilder);
+    workflowBuilder.addJob(jobName, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -179,7 +179,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().setCommand("DummyCommand").setFailureThreshold(1)
             .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobConfigMap);
-    workflowBuilder.addJobConfig(jobName, jobBuilder);
+    workflowBuilder.addJob(jobName, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -207,7 +207,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     JobConfig.Builder jobBuilder =
         new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
             .setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJobConfig(jobName, jobBuilder);
+    workflowBuilder.addJob(jobName, jobBuilder);
 
     _driver.start(workflowBuilder.build());
 
@@ -235,7 +235,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
         .setMaxForcedReassignmentsPerTask(NUM_INSTANCES - 1).addTaskConfigs(taskConfigs)
         .setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJobConfig(jobName, jobBuilder);
+    workflowBuilder.addJob(jobName, jobBuilder);
 
     _driver.start(workflowBuilder.build());
 
@@ -268,7 +268,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
         .addTaskConfigs(taskConfigs)
         .setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJobConfig(jobName, jobBuilder);
+    workflowBuilder.addJob(jobName, jobBuilder);
 
     long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
     workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
@@ -301,7 +301,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
         .setTaskRetryDelay(delay).addTaskConfigs(taskConfigs)
         .setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJobConfig(jobName, jobBuilder);
+    workflowBuilder.addJob(jobName, jobBuilder);
 
     SingleFailTask.hasFailed = false;
     _driver.start(workflowBuilder.build());

http://git-wip-us.apache.org/repos/asf/helix/blob/1f683b86/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 639cdff..d428460 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -70,8 +70,8 @@ public class WorkflowGenerator {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
 
-    builder.addJobConfig(JOB_NAME_1, jobBuilder);
-    builder.addJobConfig(JOB_NAME_2, jobBuilder);
+    builder.addJob(JOB_NAME_1, jobBuilder);
+    builder.addJob(JOB_NAME_2, jobBuilder);
 
     return builder;
   }