You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:28 UTC

[45/50] [abbrv] git commit: [HELIX-440] One-time scheduling for task framework

[HELIX-440] One-time scheduling for task framework


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/346d8a32
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/346d8a32
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/346d8a32

Branch: refs/heads/master
Commit: 346d8a32ed91db9ce182d5cea911769a23654d0b
Parents: 0272e37
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jun 5 09:37:31 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 9 09:36:14 2014 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/ScheduleConfig.java   | 165 +++++++++++++++++++
 .../org/apache/helix/task/TaskRebalancer.java   |  74 +++++++++
 .../java/org/apache/helix/task/TaskUtil.java    |  12 ++
 .../java/org/apache/helix/task/Workflow.java    |  33 ++++
 .../org/apache/helix/task/WorkflowConfig.java   |  55 ++++++-
 .../apache/helix/task/beans/ScheduleBean.java   |  32 ++++
 .../apache/helix/task/beans/WorkflowBean.java   |   1 +
 .../task/TestIndependentTaskRebalancer.java     |  34 ++++
 8 files changed, 404 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
new file mode 100644
index 0000000..9e3801e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java
@@ -0,0 +1,165 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.task.beans.ScheduleBean;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for scheduling both one-time and recurring workflows in Helix
+ */
+public class ScheduleConfig {
+  private static final Logger LOG = Logger.getLogger(ScheduleConfig.class);
+
+  /** Enforce that a workflow can recur at most once per minute */
+  private static final long MIN_RECURRENCE_MILLIS = 60 * 1000;
+
+  private final Date _startTime;
+  private final TimeUnit _recurUnit;
+  private final Long _recurInterval;
+
+  private ScheduleConfig(Date startTime, TimeUnit recurUnit, Long recurInterval) {
+    _startTime = startTime;
+    _recurUnit = recurUnit;
+    _recurInterval = recurInterval;
+  }
+
+  /**
+   * When the workflow should be started
+   * @return Date object representing the start time
+   */
+  public Date getStartTime() {
+    return _startTime;
+  }
+
+  /**
+   * The unit of the recurrence interval if this is a recurring workflow
+   * @return the recurrence interval unit, or null if this workflow is a one-time workflow
+   */
+  public TimeUnit getRecurrenceUnit() {
+    return _recurUnit;
+  }
+
+  /**
+   * The magnitude of the recurrence interval if this is a recurring task
+   * @return the recurrence interval magnitude, or null if this workflow is a one-time workflow
+   */
+  public Long getRecurrenceInterval() {
+    return _recurInterval;
+  }
+
+  /**
+   * Check if this workflow is recurring
+   * @return true if recurring, false if one-time
+   */
+  public boolean isRecurring() {
+    return _recurUnit != null && _recurInterval != null;
+  }
+
+  /**
+   * Check if the configured schedule is valid given these constraints:
+   * <ul>
+   * <li>All workflows must have a start time</li>
+   * <li>Recurrence unit and interval must both be present if either is present</li>
+   * <li>Recurring workflows must have a positive interval magnitude</li>
+   * <li>Intervals must be at least one minute</li>
+   * </ul>
+   * @return true if valid, false if invalid
+   */
+  public boolean isValid() {
+    // For now, disallow recurring workflows
+    if (isRecurring()) {
+      LOG.error("Recurring workflows are not currently supported.");
+      return false;
+    }
+
+    // All schedules must have a start time even if they are recurring
+    if (_startTime == null) {
+      LOG.error("All schedules must have a start time!");
+      return false;
+    }
+
+    // Recurrence properties must both either be present or absent
+    if ((_recurUnit == null && _recurInterval != null)
+        || (_recurUnit != null && _recurInterval == null)) {
+      LOG.error("Recurrence interval and unit must either both be present or both be absent");
+      return false;
+    }
+
+    // Only positive recurrence intervals are allowed if present
+    if (_recurInterval != null && _recurInterval <= 0) {
+      LOG.error("Recurrence interval must be positive");
+      return false;
+    }
+
+    // Enforce minimum interval length
+    if (_recurUnit != null) {
+      long converted = _recurUnit.toMillis(_recurInterval);
+      if (converted < MIN_RECURRENCE_MILLIS) {
+        LOG.error("Recurrence must be at least " + MIN_RECURRENCE_MILLIS + " ms");
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Create this configuration from a serialized bean
+   * @param bean flat configuration of the schedule
+   * @return instantiated ScheduleConfig
+   */
+  public static ScheduleConfig from(ScheduleBean bean) {
+    return new ScheduleConfig(bean.startTime, bean.recurUnit, bean.recurInterval);
+  }
+
+  /**
+   * Create a schedule for a workflow that runs once at a specified time
+   * @param startTime the time to start the workflow
+   * @return instantiated ScheduleConfig
+   */
+  public static ScheduleConfig oneTimeDelayedStart(Date startTime) {
+    return new ScheduleConfig(startTime, null, null);
+  }
+
+  /*
+   * Create a schedule for a recurring workflow that should start immediately
+   * @param recurUnit the unit of the recurrence interval
+   * @param recurInterval the magnitude of the recurrence interval
+   * @return instantiated ScheduleConfig
+   * public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) {
+   * return new ScheduleConfig(new Date(), recurUnit, recurInterval);
+   * }
+   */
+
+  /*
+   * Create a schedule for a recurring workflow that should start at a specific time
+   * @param startTime the time to start the workflow the first time
+   * @param recurUnit the unit of the recurrence interval
+   * @param recurInterval the magnitude of the recurrence interval
+   * @return instantiated ScheduleConfig
+   * public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit,
+   * long recurInterval) {
+   * return new ScheduleConfig(startTime, recurUnit, recurInterval);
+   * }
+   */
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 043e7dd..37c8548 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.helix.task;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,6 +30,9 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -50,6 +54,8 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
@@ -58,6 +64,13 @@ import com.google.common.collect.Sets;
  */
 public abstract class TaskRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+
+  /** Management of already-scheduled workflows across jobs */
+  private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create();
+  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
+      .newSingleThreadScheduledExecutor();
+
+  /** For connection management */
   private HelixManager _manager;
 
   /**
@@ -116,6 +129,12 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
 
+    // Check for readiness, and stop processing if it's not ready
+    boolean isReady = scheduleIfNotReady(workflowCfg, workflowResource, resourceName);
+    if (!isReady) {
+      return emptyAssignment(resourceName);
+    }
+
     // Initialize workflow context if needed
     if (workflowCtx == null) {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
@@ -422,6 +441,43 @@ public abstract class TaskRebalancer implements HelixRebalancer {
   }
 
   /**
+   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
+   * @param workflowCfg the workflow to check
+   * @param workflowResource the Helix resource associated with the workflow
+   * @param jobResource a job from the workflow
+   * @return true if ready, false if not ready
+   */
+  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, String workflowResource,
+      String jobResource) {
+    // Ignore non-scheduled workflows
+    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+      return true;
+    }
+
+    // Figure out when this should be run, and if it's ready, then just run it
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    Date startTime = scheduleConfig.getStartTime();
+    long delay = startTime.getTime() - new Date().getTime();
+    if (delay <= 0) {
+      SCHEDULED_WORKFLOWS.remove(workflowResource);
+      SCHEDULED_WORKFLOWS.inverse().remove(startTime);
+      return true;
+    }
+
+    // No need to schedule the same runnable at the same time
+    if (SCHEDULED_WORKFLOWS.containsKey(workflowResource)
+        || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) {
+      return false;
+    }
+
+    // For workflows not yet scheduled, schedule them and record it
+    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
+    SCHEDULED_WORKFLOWS.put(workflowResource, startTime);
+    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
+    return false;
+  }
+
+  /**
    * Checks if the job has completed.
    * @param ctx The rebalancer context.
    * @param allPartitions The set of partitions to check.
@@ -660,4 +716,22 @@ public abstract class TaskRebalancer implements HelixRebalancer {
       _state = state;
     }
   }
+
+  /**
+   * The simplest possible runnable that will trigger a run of the controller pipeline
+   */
+  private static class RebalanceInvoker implements Runnable {
+    private final HelixManager _manager;
+    private final String _resource;
+
+    public RebalanceInvoker(HelixManager manager, String resource) {
+      _manager = manager;
+      _resource = resource;
+    }
+
+    @Override
+    public void run() {
+      TaskUtil.invokeRebalance(_manager, _resource);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 96b7e55..43a1741 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -36,6 +36,7 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.log4j.Logger;
@@ -182,6 +183,17 @@ public class TaskUtil {
     return Collections.emptyMap();
   }
 
+  /**
+   * Trigger a controller pipeline execution for a given resource.
+   * @param manager Helix connection
+   * @param resource the name of the resource changed to triggering the execution
+   */
+  public static void invokeRebalance(HelixManager manager, String resource) {
+    // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource));
+  }
+
   private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
     HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
     ConfigAccessor configAccessor = manager.getConfigAccessor();

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 70fb82c..fef0274 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -27,6 +27,7 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,12 +83,31 @@ public class Workflow {
     return _taskConfigs;
   }
 
+  public WorkflowConfig getWorkflowConfig() {
+    return _workflowConfig;
+  }
+
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson());
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
     cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
 
+    // Populate schedule if present
+    ScheduleConfig scheduleConfig = _workflowConfig.getScheduleConfig();
+    if (scheduleConfig != null) {
+      Date startTime = scheduleConfig.getStartTime();
+      if (startTime != null) {
+        String formattedTime = WorkflowConfig.DEFAULT_DATE_FORMAT.format(startTime);
+        cfgMap.put(WorkflowConfig.START_TIME, formattedTime);
+      }
+      if (scheduleConfig.isRecurring()) {
+        cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString());
+        cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval()
+            .toString());
+      }
+    }
+
     return cfgMap;
   }
 
@@ -198,6 +218,10 @@ public class Workflow {
       }
     }
 
+    if (wf.schedule != null) {
+      builder.setScheduleConfig(ScheduleConfig.from(wf.schedule));
+    }
+
     return builder.build();
   }
 
@@ -235,6 +259,7 @@ public class Workflow {
     private JobDag _dag;
     private Map<String, Map<String, String>> _jobConfigs;
     private Map<String, List<TaskConfig>> _taskConfigs;
+    private ScheduleConfig _scheduleConfig;
     private long _expiry;
 
     public Builder(String name) {
@@ -291,6 +316,11 @@ public class Workflow {
       return this;
     }
 
+    public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
+      _scheduleConfig = scheduleConfig;
+      return this;
+    }
+
     public Builder setExpiry(long expiry) {
       _expiry = expiry;
       return this;
@@ -309,6 +339,9 @@ public class Workflow {
       WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
       builder.setTaskDag(_dag);
       builder.setTargetState(TargetState.START);
+      if (_scheduleConfig != null) {
+        builder.setScheduleConfig(_scheduleConfig);
+      }
       if (_expiry > 0) {
         builder.setExpiry(_expiry);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index ff4a2a9..a8aff1f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -19,29 +19,48 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
 
 /**
  * Provides a typed interface to workflow level configurations. Validates the configurations.
  */
 public class WorkflowConfig {
+  private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
+
   /* Config fields */
   public static final String DAG = "Dag";
   public static final String TARGET_STATE = "TargetState";
   public static final String EXPIRY = "Expiry";
+  public static final String START_TIME = "StartTime";
+  public static final String RECURRENCE_UNIT = "RecurrenceUnit";
+  public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
 
   /* Default values */
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+  public static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat(
+      "MM-dd-yyyy HH:mm:ss");
+  static {
+    DEFAULT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
 
   /* Member variables */
   private JobDag _jobDag;
   private TargetState _targetState;
   private long _expiry;
+  private ScheduleConfig _scheduleConfig;
 
-  private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry) {
+  private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry,
+      ScheduleConfig scheduleConfig) {
     _jobDag = jobDag;
     _targetState = targetState;
     _expiry = expiry;
+    _scheduleConfig = scheduleConfig;
   }
 
   public JobDag getJobDag() {
@@ -56,10 +75,15 @@ public class WorkflowConfig {
     return _expiry;
   }
 
+  public ScheduleConfig getScheduleConfig() {
+    return _scheduleConfig;
+  }
+
   public static class Builder {
     private JobDag _taskDag = JobDag.EMPTY_DAG;
     private TargetState _targetState = TargetState.START;
     private long _expiry = DEFAULT_EXPIRY;
+    private ScheduleConfig _scheduleConfig;
 
     public Builder() {
       // Nothing to do
@@ -68,7 +92,7 @@ public class WorkflowConfig {
     public WorkflowConfig build() {
       validate();
 
-      return new WorkflowConfig(_taskDag, _targetState, _expiry);
+      return new WorkflowConfig(_taskDag, _targetState, _expiry, _scheduleConfig);
     }
 
     public Builder setTaskDag(JobDag v) {
@@ -86,6 +110,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setScheduleConfig(ScheduleConfig scheduleConfig) {
+      _scheduleConfig = scheduleConfig;
+      return this;
+    }
+
     public static Builder fromMap(Map<String, String> cfg) {
       Builder b = new Builder();
 
@@ -103,6 +132,24 @@ public class WorkflowConfig {
         b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
       }
 
+      // Parse schedule-specific configs, if they exist
+      Date startTime = null;
+      if (cfg.containsKey(START_TIME)) {
+        try {
+          startTime = DEFAULT_DATE_FORMAT.parse(cfg.get(START_TIME));
+        } catch (ParseException e) {
+          LOG.error("Unparseable date " + cfg.get(START_TIME), e);
+        }
+      }
+      if (cfg.containsKey(RECURRENCE_UNIT) && cfg.containsKey(RECURRENCE_INTERVAL)) {
+        /*
+         * b.setScheduleConfig(ScheduleConfig.recurringFromDate(startTime,
+         * TimeUnit.valueOf(cfg.get(RECURRENCE_UNIT)),
+         * Long.parseLong(cfg.get(RECURRENCE_INTERVAL))));
+         */
+      } else if (startTime != null) {
+        b.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(startTime));
+      }
       return b;
     }
 
@@ -110,6 +157,10 @@ public class WorkflowConfig {
       if (_expiry < 0) {
         throw new IllegalArgumentException(
             String.format("%s has invalid value %s", EXPIRY, _expiry));
+      } else if (_scheduleConfig != null && !_scheduleConfig.isValid()) {
+        throw new IllegalArgumentException(
+            "Scheduler configuration is invalid. The configuration must have a start time if it is "
+                + "one-time, and it must have a positive interval magnitude if it is recurring");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java
new file mode 100644
index 0000000..9e843f5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java
@@ -0,0 +1,32 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A bean representing how a workflow can be scheduled in Helix
+ */
+public class ScheduleBean {
+  public Date startTime;
+  public Long recurInterval;
+  public TimeUnit recurUnit;
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 76da4c8..2ea23c7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -28,4 +28,5 @@ public class WorkflowBean {
   public String name;
   public String expiry;
   public List<JobBean> jobs;
+  public ScheduleBean schedule;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/346d8a32/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 006c3fe..1196f41 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.ScheduleConfig;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
@@ -44,7 +46,9 @@ import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskResult.Status;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -246,6 +250,36 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(_runCounts.values().contains(1));
   }
 
+  @Test
+  public void testOneTimeScheduled() throws Exception {
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    Map<String, String> taskConfigMap = Maps.newHashMap();
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
+    taskConfigs.add(taskConfig1);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
+    workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that the class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+
+    // Check that the workflow only started after the start time (with a 1 second buffer)
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, jobName);
+    long startTime = workflowCtx.getStartTime();
+    Assert.assertTrue((startTime + 1000) >= inFiveSeconds);
+  }
+
   private class TaskOne extends ReindexTask {
     private final boolean _shouldFail;
     private final String _instanceName;