You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:29 UTC

[46/50] [abbrv] git commit: [HELIX-416] Support recurring scheduled tasks

[HELIX-416] Support recurring scheduled tasks


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0f79187d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0f79187d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0f79187d

Branch: refs/heads/master
Commit: 0f79187d31c8769a668ba81f8bcc5e5831c659da
Parents: 346d8a3
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Jun 23 13:59:59 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 9 09:48:57 2014 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/ScheduleConfig.java   |  29 ++-
 .../org/apache/helix/task/TaskConstants.java    |   4 +
 .../java/org/apache/helix/task/TaskDriver.java  |  36 +++-
 .../org/apache/helix/task/TaskRebalancer.java   | 134 +++++++++---
 .../java/org/apache/helix/task/TaskUtil.java    | 211 +++++++++++++++++++
 .../java/org/apache/helix/task/Workflow.java    |   4 +-
 .../org/apache/helix/task/WorkflowConfig.java   |  39 +---
 .../org/apache/helix/task/WorkflowContext.java  |   9 +
 .../apache/helix/task/beans/WorkflowBean.java   |   4 +-
 9 files changed, 392 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
index 9e3801e..b123793 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
@@ -87,12 +87,6 @@ public class ScheduleConfig {
    * @return true if valid, false if invalid
    */
   public boolean isValid() {
-    // For now, disallow recurring workflows
-    if (isRecurring()) {
-      LOG.error("Recurring workflows are not currently supported.");
-      return false;
-    }
-
     // All schedules must have a start time even if they are recurring
     if (_startTime == null) {
       LOG.error("All schedules must have a start time!");
@@ -141,25 +135,28 @@ public class ScheduleConfig {
     return new ScheduleConfig(startTime, null, null);
   }
 
-  /*
+  /**
    * Create a schedule for a recurring workflow that should start immediately
    * @param recurUnit the unit of the recurrence interval
    * @param recurInterval the magnitude of the recurrence interval
    * @return instantiated ScheduleConfig
-   * public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) {
-   * return new ScheduleConfig(new Date(), recurUnit, recurInterval);
-   * }
    */
+  public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) {
+    return new ScheduleConfig(new Date(), recurUnit, recurInterval);
+  }
 
-  /*
+  /**
    * Create a schedule for a recurring workflow that should start at a specific time
-   * @param startTime the time to start the workflow the first time
+   * @param startTime the time to start the workflow the first time, or null if now
    * @param recurUnit the unit of the recurrence interval
    * @param recurInterval the magnitude of the recurrence interval
    * @return instantiated ScheduleConfig
-   * public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit,
-   * long recurInterval) {
-   * return new ScheduleConfig(startTime, recurUnit, recurInterval);
-   * }
    */
+  public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit,
+      long recurInterval) {
+    if (startTime == null) {
+      startTime = new Date();
+    }
+    return new ScheduleConfig(startTime, recurUnit, recurInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 305323d..34008d6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -39,4 +39,8 @@ public class TaskConstants {
    * The root property store path at which the {@link TaskRebalancer} stores context information.
    */
   public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
+  /**
+   * Resource prefix for scheduled workflows
+   */
+  public static final String SCHEDULED = "SCHEDULED";
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index d5e9101..0610c01 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -34,6 +35,7 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -41,6 +43,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.log4j.Logger;
@@ -221,11 +224,38 @@ public class TaskDriver {
 
   /** Helper function to change target state for a given task */
   private void setTaskTargetState(String jobResource, TargetState state) {
+    setSingleTaskTargetState(jobResource, state);
+
+    // For recurring schedules, child workflows must also be handled
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    HelixProperty p = new HelixProperty(jobResource);
-    p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
-    accessor.updateProperty(accessor.keyBuilder().resourceConfig(jobResource), p);
+    List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+    for (String resource : resources) {
+      String prefix = resource + "_" + TaskConstants.SCHEDULED;
+      if (resource.startsWith(prefix)) {
+        setSingleTaskTargetState(resource, state);
+      }
+    }
+  }
 
+  /** Helper function to change target state for a given task */
+  private void setSingleTaskTargetState(String jobResource, final TargetState state) {
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        // Only update target state for non-completed workflows
+        String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
+        if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
+          currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+        }
+        return currentData;
+      }
+    };
+    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+    updaters.add(updater);
+    List<String> paths = Lists.newArrayList();
+    paths.add(accessor.keyBuilder().resourceConfig(jobResource).getPath());
+    accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
     invokeRebalance();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 37c8548..0e11d21 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
@@ -57,6 +58,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -65,12 +67,12 @@ import com.google.common.collect.Sets;
 public abstract class TaskRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
 
-  /** Management of already-scheduled workflows across jobs */
+  // Management of already-scheduled workflows across jobs
   private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create();
   private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
       .newSingleThreadScheduledExecutor();
 
-  /** For connection management */
+  // For connection management
   private HelixManager _manager;
 
   /**
@@ -129,12 +131,6 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
 
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady = scheduleIfNotReady(workflowCfg, workflowResource, resourceName);
-    if (!isReady) {
-      return emptyAssignment(resourceName);
-    }
-
     // Initialize workflow context if needed
     if (workflowCtx == null) {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
@@ -145,7 +141,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) {
       if (workflowCtx.getJobState(parent) == null
           || !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) {
-        return emptyAssignment(resourceName);
+        return emptyAssignment(resourceName, currStateOutput);
       }
     }
 
@@ -153,7 +149,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName);
+      return emptyAssignment(resourceName, currStateOutput);
     }
 
     // Check if this workflow has been finished past its expiry.
@@ -161,7 +157,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
         && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
       markForDeletion(_manager, workflowResource);
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName);
+      return emptyAssignment(resourceName, currStateOutput);
     }
 
     // Fetch any existing context information from the property store.
@@ -174,9 +170,17 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     // The job is already in a final state (completed/failed).
     if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
         || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
-      return emptyAssignment(resourceName);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
+    // Check for readiness, and stop processing if it's not ready
+    boolean isReady =
+        scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
+    if (!isReady) {
+      return emptyAssignment(resourceName, currStateOutput);
     }
 
+    // Grab the old assignment, or an empty one if it doesn't exist
     ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
     if (prevAssignment == null) {
       prevAssignment = new ResourceAssignment(ResourceId.from(resourceName));
@@ -359,8 +363,9 @@ public abstract class TaskRebalancer implements HelixRebalancer {
             if (!successOptional) {
               workflowCtx.setJobState(jobResource, TaskState.FAILED);
               workflowCtx.setWorkflowState(TaskState.FAILED);
+              workflowCtx.setFinishTime(System.currentTimeMillis());
               addAllPartitions(allPartitions, partitionsToDropFromIs);
-              return emptyAssignment(jobResource);
+              return emptyAssignment(jobResource, currStateOutput);
             } else {
               skippedPartitions.add(pId);
               partitionsToDropFromIs.add(pId);
@@ -443,12 +448,14 @@ public abstract class TaskRebalancer implements HelixRebalancer {
   /**
    * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
    * @param workflowCfg the workflow to check
+   * @param workflowCtx the current workflow context
    * @param workflowResource the Helix resource associated with the workflow
    * @param jobResource a job from the workflow
+   * @param cache the current snapshot of the cluster
    * @return true if ready, false if not ready
    */
-  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, String workflowResource,
-      String jobResource) {
+  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      String workflowResource, String jobResource, Cluster cache) {
     // Ignore non-scheduled workflows
     if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
       return true;
@@ -457,11 +464,66 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     // Figure out when this should be run, and if it's ready, then just run it
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     Date startTime = scheduleConfig.getStartTime();
-    long delay = startTime.getTime() - new Date().getTime();
-    if (delay <= 0) {
-      SCHEDULED_WORKFLOWS.remove(workflowResource);
-      SCHEDULED_WORKFLOWS.inverse().remove(startTime);
-      return true;
+    long currentTime = new Date().getTime();
+    long delayFromStart = startTime.getTime() - currentTime;
+
+    if (delayFromStart <= 0) {
+      // Remove any timers that are past-time for this workflow
+      Date scheduledTime = SCHEDULED_WORKFLOWS.get(workflowResource);
+      if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+        SCHEDULED_WORKFLOWS.remove(workflowResource);
+      }
+
+      // Recurring workflows are just templates that spawn new workflows
+      if (scheduleConfig.isRecurring()) {
+        // Skip scheduling this workflow if it's not in a start state
+        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          return false;
+        }
+
+        // Skip scheduling this workflow again if the previous run (if any) is still active
+        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+        if (lastScheduled != null) {
+          WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
+          if (lastWorkflowCtx == null
+              || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+            return false;
+          }
+        }
+
+        // Figure out how many jumps are needed, thus the time to schedule the next workflow
+        // The negative of the delay is the amount of time past the start time
+        long period =
+            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+        long offsetMultiplier = (-delayFromStart) / period;
+        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+        // Now clone the workflow if this clone has not yet been created
+        String newWorkflowName =
+            workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
+        if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) {
+          Workflow clonedWf =
+              TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
+                  timeToSchedule));
+          TaskDriver driver = new TaskDriver(_manager);
+          try {
+            // Start the cloned workflow
+            driver.start(clonedWf);
+          } catch (Exception e) {
+            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+          }
+          // Persist workflow start regardless of success to avoid retrying and failing
+          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+          TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+        }
+
+        // Change the time to trigger the pipeline to that of the next run
+        startTime = new Date(timeToSchedule + period);
+        delayFromStart = startTime.getTime() - System.currentTimeMillis();
+      } else {
+        // This is a one-time workflow and is ready
+        return true;
+      }
     }
 
     // No need to schedule the same runnable at the same time
@@ -470,11 +532,22 @@ public abstract class TaskRebalancer implements HelixRebalancer {
       return false;
     }
 
+    scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
+    return false;
+  }
+
+  private void scheduleRebalance(String workflowResource, String jobResource, Date startTime,
+      long delayFromStart) {
+    // No need to schedule the same runnable at the same time
+    if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
+        || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+      return;
+    }
+
     // For workflows not yet scheduled, schedule them and record it
     RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
     SCHEDULED_WORKFLOWS.put(workflowResource, startTime);
-    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
-    return false;
+    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -620,8 +693,21 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     }
   }
 
-  private static ResourceAssignment emptyAssignment(String name) {
-    return new ResourceAssignment(ResourceId.from(name));
+  private static ResourceAssignment emptyAssignment(String name,
+      ResourceCurrentState currStateOutput) {
+    ResourceId resourceId = ResourceId.from(name);
+    ResourceAssignment assignment = new ResourceAssignment(resourceId);
+    Set<PartitionId> partitions = currStateOutput.getCurrentStateMappedPartitions(resourceId);
+    for (PartitionId partition : partitions) {
+      Map<ParticipantId, State> currentStateMap =
+          currStateOutput.getCurrentStateMap(resourceId, partition);
+      Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        replicaMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+      }
+      assignment.addReplicaMap(partition, replicaMap);
+    }
+    return assignment;
   }
 
   private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 43a1741..b8582b1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -20,10 +20,14 @@ package org.apache.helix.task;
  */
 
 import java.io.IOException;
+import java.text.ParseException;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
@@ -44,6 +48,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -63,6 +68,9 @@ public class TaskUtil {
    */
   public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
     HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+    if (jobResourceConfig == null) {
+      return null;
+    }
     JobConfig.Builder b =
         JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
     Map<String, Map<String, String>> rawTaskConfigMap =
@@ -76,13 +84,33 @@ public class TaskUtil {
     return b.build();
   }
 
+  /**
+   * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+   * @param manager Helix manager object used to connect to Helix.
+   * @param workflowResource The name of the workflow resource.
+   * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
+   *         workflow, null otherwise.
+   */
   public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
     Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+    if (workflowCfg == null) {
+      return null;
+    }
     WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
 
     return b.build();
   }
 
+  /**
+   * Request a state change for a specific task.
+   * @param accessor connected Helix data accessor
+   * @param instance the instance serving the task
+   * @param sessionId the current session of the instance
+   * @param resource the job name
+   * @param partition the task partition name
+   * @param state the requested state
+   * @return true if the request was persisted, false otherwise
+   */
   public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
       String sessionId, String resource, String partition, TaskPartitionState state) {
     LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
@@ -101,11 +129,23 @@ public class TaskUtil {
     }
   }
 
+  /**
+   * Get a Helix configuration scope at a resource (i.e. job and workflow) level
+   * @param clusterName the cluster containing the resource
+   * @param resource the resource name
+   * @return instantiated {@link HelixConfigScope}
+   */
   public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
     return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
         .forCluster(clusterName).forResource(resource).build();
   }
 
+  /**
+   * Get the last task assignment for a given job
+   * @param manager a connection to Helix
+   * @param resourceName the name of the job
+   * @return {@link ResourceAssignment} instance, or null if no assignment is available
+   */
   public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
       String resourceName) {
     ZNRecord r =
@@ -115,6 +155,12 @@ public class TaskUtil {
     return r != null ? new ResourceAssignment(r) : null;
   }
 
+  /**
+   * Set the last task assignment for a given job
+   * @param manager a connection to Helix
+   * @param resourceName the name of the job
+   * @param ra {@link ResourceAssignment} containing the task assignment
+   */
   public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
       ResourceAssignment ra) {
     manager.getHelixPropertyStore().set(
@@ -122,6 +168,12 @@ public class TaskUtil {
         ra.getRecord(), AccessOption.PERSISTENT);
   }
 
+  /**
+   * Get the runtime context of a single job
+   * @param manager a connection to Helix
+   * @param jobResource the name of the job
+   * @return the {@link JobContext}, or null if none is available
+   */
   public static JobContext getJobContext(HelixManager manager, String jobResource) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
@@ -130,12 +182,24 @@ public class TaskUtil {
     return r != null ? new JobContext(r) : null;
   }
 
+  /**
+   * Set the runtime context of a single job
+   * @param manager a connection to Helix
+   * @param jobResource the name of the job
+   * @param ctx the up-to-date {@link JobContext} for the job
+   */
   public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
     manager.getHelixPropertyStore().set(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
         ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
+  /**
+   * Get the rumtime context of a single workflow
+   * @param manager a connection to Helix
+   * @param workflowResource the name of the workflow
+   * @return the {@link WorkflowContext}, or null if none is available
+   */
   public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
@@ -144,6 +208,12 @@ public class TaskUtil {
     return r != null ? new WorkflowContext(r) : null;
   }
 
+  /**
+   * Set the rumtime context of a single workflow
+   * @param manager a connection to Helix
+   * @param workflowResource the name of the workflow
+   * @param ctx the up-to-date {@link WorkflowContext} for the workflow
+   */
   public static void setWorkflowContext(HelixManager manager, String workflowResource,
       WorkflowContext ctx) {
     manager.getHelixPropertyStore().set(
@@ -151,14 +221,45 @@ public class TaskUtil {
         ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
+  /**
+   * Get a workflow-qualified job name for a single-job workflow
+   * @param singleJobWorkflow the name of the single-job workflow
+   * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow
+   */
   public static String getNamespacedJobName(String singleJobWorkflow) {
     return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow);
   }
 
+  /**
+   * Get a workflow-qualified job name for a job in that workflow
+   * @param workflowResource the name of the workflow
+   * @param jobName the un-namespaced name of the job
+   * @return The namespaced job name, which is just workflowResource_jobName
+   */
   public static String getNamespacedJobName(String workflowResource, String jobName) {
     return workflowResource + "_" + jobName;
   }
 
+  /**
+   * Remove the workflow namespace from the job name
+   * @param workflowResource the name of the workflow that owns the job
+   * @param jobName the namespaced job name
+   * @return the denamespaced job name, or the same job name if it is already denamespaced
+   */
+  public static String getDenamespacedJobName(String workflowResource, String jobName) {
+    if (jobName.contains(workflowResource)) {
+      // skip the entire length of the work plus the underscore
+      return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
+    } else {
+      return jobName;
+    }
+  }
+
+  /**
+   * Serialize a map of job-level configurations as a single string
+   * @param commandConfig map of job config key to config value
+   * @return serialized string
+   */
   public static String serializeJobConfigMap(Map<String, String> commandConfig) {
     ObjectMapper mapper = new ObjectMapper();
     try {
@@ -170,6 +271,11 @@ public class TaskUtil {
     return null;
   }
 
+  /**
+   * Deserialize a single string into a map of job-level configurations
+   * @param commandConfig the serialized job config map
+   * @return a map of job config key to config value
+   */
   public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
     ObjectMapper mapper = new ObjectMapper();
     try {
@@ -194,6 +300,111 @@ public class TaskUtil {
     accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource));
   }
 
+  /**
+   * Get a ScheduleConfig from a workflow config string map
+   * @param cfg the string map
+   * @return a ScheduleConfig if one exists, otherwise null
+   */
+  public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
+    // Parse schedule-specific configs, if they exist
+    Date startTime = null;
+    if (cfg.containsKey(WorkflowConfig.START_TIME)) {
+      try {
+        startTime = WorkflowConfig.DEFAULT_DATE_FORMAT.parse(cfg.get(WorkflowConfig.START_TIME));
+      } catch (ParseException e) {
+        LOG.error("Unparseable date " + cfg.get(WorkflowConfig.START_TIME), e);
+        return null;
+      }
+    }
+    if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT)
+        && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
+      return ScheduleConfig.recurringFromDate(startTime,
+          TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
+          Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
+    } else if (startTime != null) {
+      return ScheduleConfig.oneTimeDelayedStart(startTime);
+    }
+    return null;
+  }
+
+  /**
+   * Create a new workflow based on an existing one
+   * @param manager connection to Helix
+   * @param origWorkflowName the name of the existing workflow
+   * @param newWorkflowName the name of the new workflow
+   * @param newStartTime a provided start time that deviates from the desired start time
+   * @return the cloned workflow, or null if there was a problem cloning the existing one
+   */
+  public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
+      String newWorkflowName, Date newStartTime) {
+    // Read all resources, including the workflow and jobs of interest
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    if (!resourceConfigMap.containsKey(origWorkflowName)) {
+      LOG.error("No such workflow named " + origWorkflowName);
+      return null;
+    }
+    if (resourceConfigMap.containsKey(newWorkflowName)) {
+      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
+      return null;
+    }
+
+    // Create a new workflow with a new name
+    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
+    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
+    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
+    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
+    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
+
+    // Set the workflow expiry
+    builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));
+
+    // Set the schedule, if applicable
+    ScheduleConfig scheduleConfig;
+    if (newStartTime != null) {
+      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
+    } else {
+      scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
+    }
+    if (scheduleConfig != null) {
+      builder.setScheduleConfig(scheduleConfig);
+    }
+
+    // Add each job back as long as the original exists
+    Set<String> namespacedJobs = jobDag.getAllNodes();
+    for (String namespacedJob : namespacedJobs) {
+      if (resourceConfigMap.containsKey(namespacedJob)) {
+        // Copy over job-level and task-level configs
+        String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
+        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
+        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
+        jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
+        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
+          builder.addConfig(job, e.getKey(), e.getValue());
+        }
+        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
+        List<TaskConfig> taskConfigs = Lists.newLinkedList();
+        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+          taskConfigs.add(taskConfig);
+        }
+        builder.addTaskConfigs(job, taskConfigs);
+
+        // Add dag dependencies
+        Set<String> children = parentsToChildren.get(namespacedJob);
+        if (children != null) {
+          for (String namespacedChild : children) {
+            String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
+            builder.addParentChildDependency(job, child);
+          }
+        }
+      }
+    }
+    return builder.build();
+  }
+
   private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
     HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
     ConfigAccessor configAccessor = manager.getConfigAccessor();

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index fef0274..320c020 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -221,6 +221,7 @@ public class Workflow {
     if (wf.schedule != null) {
       builder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
     }
+    builder.setExpiry(wf.expiry);
 
     return builder.build();
   }
@@ -267,7 +268,7 @@ public class Workflow {
       _dag = new JobDag();
       _jobConfigs = new TreeMap<String, Map<String, String>>();
       _taskConfigs = new TreeMap<String, List<TaskConfig>>();
-      _expiry = -1;
+      _expiry = WorkflowConfig.DEFAULT_EXPIRY;
     }
 
     public Builder addConfig(String job, String key, String val) {
@@ -345,7 +346,6 @@ public class Workflow {
       if (_expiry > 0) {
         builder.setExpiry(_expiry);
       }
-
       return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
                                                                               // internally
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index a8aff1f..782c375 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -19,20 +19,14 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.Map;
 import java.util.TimeZone;
 
-import org.apache.log4j.Logger;
-
 /**
  * Provides a typed interface to workflow level configurations. Validates the configurations.
  */
 public class WorkflowConfig {
-  private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
-
   /* Config fields */
   public static final String DAG = "Dag";
   public static final String TARGET_STATE = "TargetState";
@@ -50,10 +44,10 @@ public class WorkflowConfig {
   }
 
   /* Member variables */
-  private JobDag _jobDag;
-  private TargetState _targetState;
-  private long _expiry;
-  private ScheduleConfig _scheduleConfig;
+  private final JobDag _jobDag;
+  private final TargetState _targetState;
+  private final long _expiry;
+  private final ScheduleConfig _scheduleConfig;
 
   private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry,
       ScheduleConfig scheduleConfig) {
@@ -85,10 +79,6 @@ public class WorkflowConfig {
     private long _expiry = DEFAULT_EXPIRY;
     private ScheduleConfig _scheduleConfig;
 
-    public Builder() {
-      // Nothing to do
-    }
-
     public WorkflowConfig build() {
       validate();
 
@@ -117,11 +107,9 @@ public class WorkflowConfig {
 
     public static Builder fromMap(Map<String, String> cfg) {
       Builder b = new Builder();
-
       if (cfg == null) {
         return b;
       }
-
       if (cfg.containsKey(EXPIRY)) {
         b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
       }
@@ -133,22 +121,9 @@ public class WorkflowConfig {
       }
 
       // Parse schedule-specific configs, if they exist
-      Date startTime = null;
-      if (cfg.containsKey(START_TIME)) {
-        try {
-          startTime = DEFAULT_DATE_FORMAT.parse(cfg.get(START_TIME));
-        } catch (ParseException e) {
-          LOG.error("Unparseable date " + cfg.get(START_TIME), e);
-        }
-      }
-      if (cfg.containsKey(RECURRENCE_UNIT) && cfg.containsKey(RECURRENCE_INTERVAL)) {
-        /*
-         * b.setScheduleConfig(ScheduleConfig.recurringFromDate(startTime,
-         * TimeUnit.valueOf(cfg.get(RECURRENCE_UNIT)),
-         * Long.parseLong(cfg.get(RECURRENCE_INTERVAL))));
-         */
-      } else if (startTime != null) {
-        b.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(startTime));
+      ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg);
+      if (scheduleConfig != null) {
+        b.setScheduleConfig(scheduleConfig);
       }
       return b;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 4feda1b..6ad71a1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -34,6 +34,7 @@ public class WorkflowContext extends HelixProperty {
   public static final String START_TIME = "START_TIME";
   public static final String FINISH_TIME = "FINISH_TIME";
   public static final String TASK_STATES = "TASK_STATES";
+  public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW";
   public static final int UNFINISHED = -1;
 
   public WorkflowContext(ZNRecord record) {
@@ -106,4 +107,12 @@ public class WorkflowContext extends HelixProperty {
 
     return Long.parseLong(tStr);
   }
+
+  public void setLastScheduledSingleWorkflow(String wf) {
+    _record.setSimpleField(LAST_SCHEDULED_WORKFLOW, wf);
+  }
+
+  public String getLastScheduledSingleWorkflow() {
+    return _record.getSimpleField(LAST_SCHEDULED_WORKFLOW);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0f79187d/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 2ea23c7..a59e818 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -21,12 +21,14 @@ package org.apache.helix.task.beans;
 
 import java.util.List;
 
+import org.apache.helix.task.WorkflowConfig;
+
 /**
  * Bean class used for parsing workflow definitions from YAML.
  */
 public class WorkflowBean {
   public String name;
-  public String expiry;
   public List<JobBean> jobs;
   public ScheduleBean schedule;
+  public long expiry = WorkflowConfig.DEFAULT_EXPIRY;
 }