You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/12/10 08:04:53 UTC

[1/3] helix git commit: [HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 7bbb20be6 -> 1798e7935


http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 38c9113..79adcd5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -24,13 +24,17 @@ import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
@@ -195,7 +199,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
   }
 
-
   private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
     return buildRecurrentJobQueue(jobQueueName, 0);
   }
@@ -299,9 +302,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
-            TaskState.COMPLETED);
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+        TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
@@ -417,54 +419,68 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
   public void testJobsDisableExternalView() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
+    ExternviewChecker externviewChecker = new ExternviewChecker();
+    _manager.addExternalViewChangeListener(externviewChecker);
+
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
     JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
 
-    // create jobs
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
 
-    JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-            .setTargetPartitionStates(Sets.newHashSet("SLAVE"))
-            .setDisableExternalView(true);
+    JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
 
-    JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-            .setTargetPartitionStates(Sets.newHashSet("MASTER"));
+    JobConfig.Builder job3 = new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
 
     // enqueue both jobs
     queueBuilder.enqueueJob("job1", job1);
     queueBuilder.enqueueJob("job2", job2);
+    queueBuilder.enqueueJob("job3", job3);
 
     _driver.createQueue(queueBuilder.build());
 
-
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
-    // ensure job1 is started
-    String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS,
-        TaskState.COMPLETED);
-
-    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
-    // verify external view for job does not exists
-    ExternalView externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob1));
-    Assert.assertNull(externalView, "External View for " + namedSpaceJob1 + " shoudld not exist!");
+    // ensure all jobs are completed
+    String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3");
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
 
-    // ensure job2 is completed
+    Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
+    String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
     String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, "job2");
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.IN_PROGRESS,
-        TaskState.COMPLETED);
 
-    // verify external view for job does not exists
-    externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob2));
-    Assert.assertNotNull(externalView, "Can not find external View for " + namedSpaceJob2 + "!");
+    Assert.assertTrue(seenExternalViews.contains(namedSpaceJob1),
+        "Can not find external View for " + namedSpaceJob1 + "!");
+    Assert.assertTrue(!seenExternalViews.contains(namedSpaceJob2),
+        "External View for " + namedSpaceJob2 + " shoudld not exist!");
+    Assert.assertTrue(seenExternalViews.contains(namedSpaceJob3),
+        "Can not find external View for " + namedSpaceJob3 + "!");
+
+    _manager
+        .removeListener(new PropertyKey.Builder(CLUSTER_NAME).externalViews(), externviewChecker);
   }
 
+  private static class ExternviewChecker implements ExternalViewChangeListener {
+    private Set<String> _seenExternalViews = new HashSet<String>();
+
+    @Override public void onExternalViewChange(List<ExternalView> externalViewList,
+        NotificationContext changeContext) {
+      for (ExternalView view : externalViewList) {
+        _seenExternalViews.add(view.getResourceName());
+      }
+    }
+
+    public Set<String> getSeenExternalViews() {
+      return _seenExternalViews;
+    }
+  }
 
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f49f941..f402b82 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -185,7 +185,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Wait for job to finish and expire
     TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
-    _driver.invokeRebalance();
+    TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
     Thread.sleep(expiry);
 
     // Ensure workflow config and context were cleaned up by now


[3/3] helix git commit: [HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.

Posted by ki...@apache.org.
[HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1798e793
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1798e793
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1798e793

Branch: refs/heads/helix-0.6.x
Commit: 1798e793522157b1b479a66c8a9ec9453d698b8f
Parents: 7bbb20b
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Dec 9 14:02:45 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Dec 9 16:39:37 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/model/IdealState.java |    8 +-
 .../FixedTargetTaskAssignmentCalculator.java    |  163 +++
 .../helix/task/FixedTargetTaskRebalancer.java   |  163 ---
 .../task/GenericTaskAssignmentCalculator.java   |  273 +++++
 .../helix/task/GenericTaskRebalancer.java       |  273 -----
 .../org/apache/helix/task/JobRebalancer.java    |  650 +++++++++++
 .../helix/task/TaskAssignmentCalculator.java    |   45 +
 .../java/org/apache/helix/task/TaskDriver.java  |  205 ++--
 .../org/apache/helix/task/TaskRebalancer.java   | 1045 +++---------------
 .../java/org/apache/helix/task/TaskUtil.java    |  216 ++--
 .../java/org/apache/helix/task/Workflow.java    |    6 +-
 .../apache/helix/task/WorkflowRebalancer.java   |  412 +++++++
 .../integration/task/TestRecurringJobQueue.java |   78 +-
 .../integration/task/TestTaskRebalancer.java    |    2 +-
 14 files changed, 1971 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 696de7a..e7f6096 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -31,9 +31,9 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.task.FixedTargetTaskRebalancer;
-import org.apache.helix.task.GenericTaskRebalancer;
+import org.apache.helix.task.JobRebalancer;
 import org.apache.helix.task.TaskRebalancer;
+import org.apache.helix.task.WorkflowRebalancer;
 import org.apache.log4j.Logger;
 
 /**
@@ -524,8 +524,8 @@ public class IdealState extends HelixProperty {
     default:
       String rebalancerName = getRebalancerClassName();
       if (rebalancerName != null) {
-        if (rebalancerName.equals(FixedTargetTaskRebalancer.class.getName())
-            || rebalancerName.equals(GenericTaskRebalancer.class.getName())) {
+        if (rebalancerName.equals(JobRebalancer.class.getName())
+            || rebalancerName.equals(WorkflowRebalancer.class.getName())) {
           property = RebalanceMode.TASK;
         } else {
           property = RebalanceMode.USER_DEFINED;

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
new file mode 100644
index 0000000..8760524
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -0,0 +1,163 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states on a target
+ * resource. Here, tasks are co-located according to where a resource's partitions are, as well as
+ * (if desired) only where those partitions are in a given state.
+ */
+public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
+      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+    if (tgtIs == null) {
+      return Collections.emptyMap();
+    }
+    Set<String> tgtStates = jobCfg.getTargetPartitionStates();
+    return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
+        jobContext);
+  }
+
+  /**
+   * Gets the ideal state of the target resource of this job
+   * @param jobCfg job config containing target resource id
+   * @param cache snapshot of the cluster containing the task and target resource
+   * @return target resource ideal state, or null
+   */
+  private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+    String tgtResourceId = jobCfg.getTargetResource();
+    return cache.getIdealState(tgtResourceId);
+  }
+
+  /**
+   * Returns the set of all partition ids for a job.
+   * <p/>
+   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+   * use the list of all partition ids from the target resource.
+   */
+  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
+      JobContext taskCtx) {
+    if (tgtResourceIs == null) {
+      return null;
+    }
+    Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
+    SortedSet<String> targetPartitions = Sets.newTreeSet();
+    if (jobCfg.getTargetPartitions() != null) {
+      targetPartitions.addAll(jobCfg.getTargetPartitions());
+    } else {
+      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+    }
+
+    Set<Integer> taskPartitions = Sets.newTreeSet();
+    for (String pName : targetPartitions) {
+      taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
+    }
+    return taskPartitions;
+  }
+
+  private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
+      Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
+    if (!currentTargets.containsKey(targetPartition)) {
+      int nextId = jobCtx.getPartitionSet().size();
+      jobCtx.setPartitionTarget(nextId, targetPartition);
+      return Lists.newArrayList(nextId);
+    } else {
+      return currentTargets.get(targetPartition);
+    }
+  }
+
+  /**
+   * Get partition assignments for the target resource, but only for the partitions of interest.
+   * @param currStateOutput The current state of the instances in the cluster.
+   * @param instances The instances.
+   * @param tgtIs The ideal state of the target resource.
+   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+   *          do not need to
+   *          be in any specific state to be considered.
+   * @param includeSet The set of partitions to consider.
+   * @return A map of instance vs set of partition ids assigned to that instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+      CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instances) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
+    for (String pName : tgtIs.getPartitionSet()) {
+      List<Integer> partitions = partitionsByTarget.get(pName);
+      if (partitions == null || partitions.size() < 1) {
+        continue;
+      }
+      int pId = partitions.get(0);
+      if (includeSet.contains(pId)) {
+        for (String instance : instances) {
+          Message pendingMessage =
+              currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName),
+                  instance);
+          if (pendingMessage != null) {
+            continue;
+          }
+          String s =
+              currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
+                  instance);
+          String state = (s == null ? null : s.toString());
+          if (tgtStates == null || tgtStates.contains(state)) {
+            result.get(instance).add(pId);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
deleted file mode 100644
index 4c013c0..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * A rebalancer for when a task group must be assigned according to partitions/states on a target
- * resource. Here, tasks are colocated according to where a resource's partitions are, as well as
- * (if desired) only where those partitions are in a given state.
- */
-public class FixedTargetTaskRebalancer extends TaskRebalancer {
-
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
-    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
-      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
-      JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, ClusterDataCache cache) {
-    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
-    if (tgtIs == null) {
-      return Collections.emptyMap();
-    }
-    Set<String> tgtStates = jobCfg.getTargetPartitionStates();
-    return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
-        jobContext);
-  }
-
-  /**
-   * Gets the ideal state of the target resource of this job
-   * @param jobCfg job config containing target resource id
-   * @param cluster snapshot of the cluster containing the task and target resource
-   * @return target resource ideal state, or null
-   */
-  private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
-    String tgtResourceId = jobCfg.getTargetResource();
-    return cache.getIdealState(tgtResourceId);
-  }
-
-  /**
-   * Returns the set of all partition ids for a job.
-   * <p/>
-   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
-   * use the list of all partition ids from the target resource.
-   */
-  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
-      JobContext taskCtx) {
-    if (tgtResourceIs == null) {
-      return null;
-    }
-    Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
-    SortedSet<String> targetPartitions = Sets.newTreeSet();
-    if (jobCfg.getTargetPartitions() != null) {
-      targetPartitions.addAll(jobCfg.getTargetPartitions());
-    } else {
-      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
-    }
-
-    Set<Integer> taskPartitions = Sets.newTreeSet();
-    for (String pName : targetPartitions) {
-      taskPartitions.addAll(getPartitionsForTargetPartition(pName, currentTargets, taskCtx));
-    }
-    return taskPartitions;
-  }
-
-  private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
-      Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
-    if (!currentTargets.containsKey(targetPartition)) {
-      int nextId = jobCtx.getPartitionSet().size();
-      jobCtx.setPartitionTarget(nextId, targetPartition);
-      return Lists.newArrayList(nextId);
-    } else {
-      return currentTargets.get(targetPartition);
-    }
-  }
-
-  /**
-   * Get partition assignments for the target resource, but only for the partitions of interest.
-   * @param currStateOutput The current state of the instances in the cluster.
-   * @param instances The instances.
-   * @param tgtIs The ideal state of the target resource.
-   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
-   *          do not need to
-   *          be in any specific state to be considered.
-   * @param includeSet The set of partitions to consider.
-   * @return A map of instance vs set of partition ids assigned to that instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
-      CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
-      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instances) {
-      result.put(instance, new TreeSet<Integer>());
-    }
-
-    Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
-    for (String pName : tgtIs.getPartitionSet()) {
-      List<Integer> partitions = partitionsByTarget.get(pName);
-      if (partitions == null || partitions.size() < 1) {
-        continue;
-      }
-      int pId = partitions.get(0);
-      if (includeSet.contains(pId)) {
-        for (String instance : instances) {
-          Message pendingMessage =
-              currStateOutput.getPendingState(tgtIs.getResourceName(), new Partition(pName),
-                  instance);
-          if (pendingMessage != null) {
-            continue;
-          }
-          String s =
-              currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
-                  instance);
-          String state = (s == null ? null : s.toString());
-          if (tgtStates == null || tgtStates.contains(state)) {
-            result.get(instance).add(pId);
-          }
-        }
-      }
-    }
-
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
new file mode 100644
index 0000000..e8d5f5d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -0,0 +1,273 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+import com.google.common.base.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
+ * assignment to target partitions and states of another resource
+ */
+public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
+  /** Reassignment policy for this algorithm */
+  private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
+
+  @Override
+  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+    for (TaskConfig taskCfg : taskMap.values()) {
+      String taskId = taskCfg.getId();
+      int nextPartition = jobCtx.getPartitionSet().size();
+      if (!taskIdMap.containsKey(taskId)) {
+        jobCtx.setTaskIdForPartition(nextPartition, taskId);
+      }
+    }
+    return jobCtx.getPartitionSet();
+  }
+
+  @Override
+  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
+      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
+      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Set<Integer> partitionSet, ClusterDataCache cache) {
+    // Gather input to the full auto rebalancing algorithm
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    states.put("ONLINE", 1);
+
+    // Only map partitions whose assignment we care about
+    final Set<TaskPartitionState> honoredStates =
+        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+            TaskPartitionState.STOPPED);
+    Set<Integer> filteredPartitionSet = Sets.newHashSet();
+    for (Integer p : partitionSet) {
+      TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
+      if (state == null || honoredStates.contains(state)) {
+        filteredPartitionSet.add(p);
+      }
+    }
+
+    // Transform from partition id to fully qualified partition name
+    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
+    Collections.sort(partitionNums);
+    final String resourceId = prevAssignment.getResourceName();
+    List<String> partitions =
+        new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
+          @Override
+          public String apply(Integer partitionNum) {
+            return resourceId + "_" + partitionNum;
+          }
+        }));
+
+    // Compute the current assignment
+    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+    for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
+      if (!filteredPartitionSet.contains(TaskUtil.getPartitionId(partition.getPartitionName()))) {
+        // not computing old partitions
+        continue;
+      }
+      Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
+      if (prevAssignment != null) {
+        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
+      }
+      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
+      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
+      currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
+    }
+
+    // Get the assignment keyed on partition
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
+            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    List<String> allNodes =
+        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
+    Collections.sort(allNodes);
+    ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+    Map<String, List<String>> preferenceLists = record.getListFields();
+
+    // Convert to an assignment keyed on participant
+    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
+      String partitionName = e.getKey();
+      partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName));
+      List<String> preferenceList = e.getValue();
+      for (String participantName : preferenceList) {
+        if (!taskAssignment.containsKey(participantName)) {
+          taskAssignment.put(participantName, new TreeSet<Integer>());
+        }
+        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
+      }
+    }
+
+    // Finally, adjust the assignment if tasks have been failing
+    taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
+    return taskAssignment;
+  }
+
+  /**
+   * Filter a list of instances based on targeted resource policies
+   * @param jobCfg the job configuration
+   * @param currStateOutput the current state of all instances in the cluster
+   * @param instances valid instances
+   * @param cache current snapshot of the cluster
+   * @return a set of instances that can be assigned to
+   */
+  private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
+      Iterable<String> instances, ClusterDataCache cache) {
+    // No target resource means any instance is available
+    Set<String> allInstances = Sets.newHashSet(instances);
+    String targetResource = jobCfg.getTargetResource();
+    if (targetResource == null) {
+      return allInstances;
+    }
+
+    // Bad ideal state means don't assign
+    IdealState idealState = cache.getIdealState(targetResource);
+    if (idealState == null) {
+      return Collections.emptySet();
+    }
+
+    // Get the partitions on the target resource to use
+    Set<String> partitions = idealState.getPartitionSet();
+    List<String> targetPartitions = jobCfg.getTargetPartitions();
+    if (targetPartitions != null && !targetPartitions.isEmpty()) {
+      partitions.retainAll(targetPartitions);
+    }
+
+    // Based on state matches, add eligible instances
+    Set<String> eligibleInstances = Sets.newHashSet();
+    Set<String> targetStates = jobCfg.getTargetPartitionStates();
+    for (String partition : partitions) {
+      Map<String, String> stateMap =
+          currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
+      Map<String, String> pendingStateMap =
+          currStateOutput.getPendingStateMap(targetResource, new Partition(partition));
+      for (Map.Entry<String, String> e : stateMap.entrySet()) {
+        String instanceName = e.getKey();
+        String state = e.getValue();
+        String pending = pendingStateMap.get(instanceName);
+        if (pending != null) {
+          continue;
+        }
+        if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
+          eligibleInstances.add(instanceName);
+        }
+      }
+    }
+    allInstances.retainAll(eligibleInstances);
+    return allInstances;
+  }
+
+  public interface RetryPolicy {
+    /**
+     * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
+     * assigned
+     * @param jobCfg the job configuration
+     * @param jobCtx the job context
+     * @param instances instances that can serve tasks
+     * @param origAssignment the unmodified assignment
+     * @return the adjusted assignment
+     */
+    Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
+  }
+
+  private static class DefaultRetryReassigner implements RetryPolicy {
+    @Override
+    public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
+        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
+      // Compute an increasing integer ID for each instance
+      BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
+      int instanceIndex = 0;
+      for (String instance : instances) {
+        instanceMap.put(instance, instanceIndex++);
+      }
+
+      // Move partitions
+      Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
+      for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
+        String instance = e.getKey();
+        SortedSet<Integer> partitions = e.getValue();
+        Integer instanceId = instanceMap.get(instance);
+        if (instanceId != null) {
+          for (int p : partitions) {
+            // Determine for each partition if there have been failures with the current assignment
+            // strategy, and if so, force a shift in assignment for that partition only
+            int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
+            int newInstanceId = (instanceId + shiftValue) % instances.size();
+            String newInstance = instanceMap.inverse().get(newInstanceId);
+            if (newInstance == null) {
+              newInstance = instance;
+            }
+            if (!newAssignment.containsKey(newInstance)) {
+              newAssignment.put(newInstance, new TreeSet<Integer>());
+            }
+            newAssignment.get(newInstance).add(p);
+          }
+        } else {
+          // In case something goes wrong, just keep the previous assignment
+          newAssignment.put(instance, partitions);
+        }
+      }
+      return newAssignment;
+    }
+
+    /**
+     * In case tasks fail, we may not want to schedule them in the same place. This method allows us
+     * to compute a shifting value so that we can systematically choose other instances to try
+     * @param jobCfg the job configuration
+     * @param jobCtx the job context
+     * @param instances instances that can be chosen
+     * @param p the partition to look up
+     * @return the shifting value
+     */
+    private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
+        Collection<String> instances, int p) {
+      int numAttempts = jobCtx.getPartitionNumAttempts(p);
+      int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
+      int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
+      return numAttempts / (maxNumAttempts / numInstances);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
deleted file mode 100644
index f4145c5..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ /dev/null
@@ -1,273 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * This class does an assignment based on an automatic rebalancing strategy, rather than requiring
- * assignment to target partitions and states of another resource
- */
-public class GenericTaskRebalancer extends TaskRebalancer {
-  /** Reassignment policy for this algorithm */
-  private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
-
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
-    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
-    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
-    for (TaskConfig taskCfg : taskMap.values()) {
-      String taskId = taskCfg.getId();
-      int nextPartition = jobCtx.getPartitionSet().size();
-      if (!taskIdMap.containsKey(taskId)) {
-        jobCtx.setTaskIdForPartition(nextPartition, taskId);
-      }
-    }
-    return jobCtx.getPartitionSet();
-  }
-
-  @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
-      ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
-      final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, ClusterDataCache cache) {
-    // Gather input to the full auto rebalancing algorithm
-    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
-    states.put("ONLINE", 1);
-
-    // Only map partitions whose assignment we care about
-    final Set<TaskPartitionState> honoredStates =
-        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
-            TaskPartitionState.STOPPED);
-    Set<Integer> filteredPartitionSet = Sets.newHashSet();
-    for (Integer p : partitionSet) {
-      TaskPartitionState state = (jobContext == null) ? null : jobContext.getPartitionState(p);
-      if (state == null || honoredStates.contains(state)) {
-        filteredPartitionSet.add(p);
-      }
-    }
-
-    // Transform from partition id to fully qualified partition name
-    List<Integer> partitionNums = Lists.newArrayList(partitionSet);
-    Collections.sort(partitionNums);
-    final String resourceId = prevAssignment.getResourceName();
-    List<String> partitions =
-        new ArrayList<String>(Lists.transform(partitionNums, new Function<Integer, String>() {
-          @Override
-          public String apply(Integer partitionNum) {
-            return resourceId + "_" + partitionNum;
-          }
-        }));
-
-    // Compute the current assignment
-    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
-    for (Partition partition : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
-      if (!filteredPartitionSet.contains(pId(partition.getPartitionName()))) {
-        // not computing old partitions
-        continue;
-      }
-      Map<String, String> allPreviousDecisionMap = Maps.newHashMap();
-      if (prevAssignment != null) {
-        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partition));
-      }
-      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partition));
-      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partition));
-      currentMapping.put(partition.getPartitionName(), allPreviousDecisionMap);
-    }
-
-    // Get the assignment keyed on partition
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
-            new AutoRebalanceStrategy.DefaultPlacementScheme());
-    List<String> allNodes =
-        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
-    Collections.sort(allNodes);
-    ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
-    Map<String, List<String>> preferenceLists = record.getListFields();
-
-    // Convert to an assignment keyed on participant
-    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
-    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
-      String partitionName = e.getKey();
-      partitionName = String.valueOf(pId(partitionName));
-      List<String> preferenceList = e.getValue();
-      for (String participantName : preferenceList) {
-        if (!taskAssignment.containsKey(participantName)) {
-          taskAssignment.put(participantName, new TreeSet<Integer>());
-        }
-        taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
-      }
-    }
-
-    // Finally, adjust the assignment if tasks have been failing
-    taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment);
-    return taskAssignment;
-  }
-
-  /**
-   * Filter a list of instances based on targeted resource policies
-   * @param jobCfg the job configuration
-   * @param currStateOutput the current state of all instances in the cluster
-   * @param instances valid instances
-   * @param cache current snapshot of the cluster
-   * @return a set of instances that can be assigned to
-   */
-  private Set<String> getEligibleInstances(JobConfig jobCfg, CurrentStateOutput currStateOutput,
-      Iterable<String> instances, ClusterDataCache cache) {
-    // No target resource means any instance is available
-    Set<String> allInstances = Sets.newHashSet(instances);
-    String targetResource = jobCfg.getTargetResource();
-    if (targetResource == null) {
-      return allInstances;
-    }
-
-    // Bad ideal state means don't assign
-    IdealState idealState = cache.getIdealState(targetResource);
-    if (idealState == null) {
-      return Collections.emptySet();
-    }
-
-    // Get the partitions on the target resource to use
-    Set<String> partitions = idealState.getPartitionSet();
-    List<String> targetPartitions = jobCfg.getTargetPartitions();
-    if (targetPartitions != null && !targetPartitions.isEmpty()) {
-      partitions.retainAll(targetPartitions);
-    }
-
-    // Based on state matches, add eligible instances
-    Set<String> eligibleInstances = Sets.newHashSet();
-    Set<String> targetStates = jobCfg.getTargetPartitionStates();
-    for (String partition : partitions) {
-      Map<String, String> stateMap =
-          currStateOutput.getCurrentStateMap(targetResource, new Partition(partition));
-      Map<String, String> pendingStateMap =
-          currStateOutput.getPendingStateMap(targetResource, new Partition(partition));
-      for (Map.Entry<String, String> e : stateMap.entrySet()) {
-        String instanceName = e.getKey();
-        String state = e.getValue();
-        String pending = pendingStateMap.get(instanceName);
-        if (pending != null) {
-          continue;
-        }
-        if (targetStates == null || targetStates.isEmpty() || targetStates.contains(state)) {
-          eligibleInstances.add(instanceName);
-        }
-      }
-    }
-    allInstances.retainAll(eligibleInstances);
-    return allInstances;
-  }
-
-  public interface RetryPolicy {
-    /**
-     * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently
-     * assigned
-     * @param jobCfg the job configuration
-     * @param jobCtx the job context
-     * @param instances instances that can serve tasks
-     * @param origAssignment the unmodified assignment
-     * @return the adjusted assignment
-     */
-    Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment);
-  }
-
-  private static class DefaultRetryReassigner implements RetryPolicy {
-    @Override
-    public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, Map<String, SortedSet<Integer>> origAssignment) {
-      // Compute an increasing integer ID for each instance
-      BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
-      int instanceIndex = 0;
-      for (String instance : instances) {
-        instanceMap.put(instance, instanceIndex++);
-      }
-
-      // Move partitions
-      Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
-      for (Map.Entry<String, SortedSet<Integer>> e : origAssignment.entrySet()) {
-        String instance = e.getKey();
-        SortedSet<Integer> partitions = e.getValue();
-        Integer instanceId = instanceMap.get(instance);
-        if (instanceId != null) {
-          for (int p : partitions) {
-            // Determine for each partition if there have been failures with the current assignment
-            // strategy, and if so, force a shift in assignment for that partition only
-            int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p);
-            int newInstanceId = (instanceId + shiftValue) % instances.size();
-            String newInstance = instanceMap.inverse().get(newInstanceId);
-            if (newInstance == null) {
-              newInstance = instance;
-            }
-            if (!newAssignment.containsKey(newInstance)) {
-              newAssignment.put(newInstance, new TreeSet<Integer>());
-            }
-            newAssignment.get(newInstance).add(p);
-          }
-        } else {
-          // In case something goes wrong, just keep the previous assignment
-          newAssignment.put(instance, partitions);
-        }
-      }
-      return newAssignment;
-    }
-
-    /**
-     * In case tasks fail, we may not want to schedule them in the same place. This method allows us
-     * to compute a shifting value so that we can systematically choose other instances to try
-     * @param jobCfg the job configuration
-     * @param jobCtx the job context
-     * @param instances instances that can be chosen
-     * @param p the partition to look up
-     * @return the shifting value
-     */
-    private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, int p) {
-      int numAttempts = jobCtx.getPartitionNumAttempts(p);
-      int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
-      int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1);
-      return numAttempts / (maxNumAttempts / numInstances);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
new file mode 100644
index 0000000..0e2ab15
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -0,0 +1,650 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.helix.*;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.*;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Custom rebalancer implementation for the {@code Job} in task model.
+ */
+public class JobRebalancer extends TaskRebalancer {
+  private static final Logger LOG = Logger.getLogger(JobRebalancer.class);
+  private static TaskAssignmentCalculator fixTaskAssignmentCal =
+      new FixedTargetTaskAssignmentCalculator();
+  private static TaskAssignmentCalculator genericTaskAssignmentCal =
+      new GenericTaskAssignmentCalculator();
+
+  private static final String PREV_RA_NODE = "PreviousResourceAssignment";
+
+  @Override
+  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+    final String jobName = resource.getResourceName();
+    LOG.debug("Computer Best Partition for job: " + jobName);
+
+    // Fetch job configuration
+    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName);
+    if (jobCfg == null) {
+      LOG.error("Job configuration is NULL for " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+    String workflowResource = jobCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    if (workflowCfg == null) {
+      LOG.error("Workflow configuration is NULL for " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+    if (workflowCtx == null) {
+      LOG.error("Workflow context is NULL for " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState != TargetState.START && targetState != TargetState.STOP) {
+      LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
+          + ".Stop scheduling job " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    TaskState jobState = workflowCtx.getJobState(jobName);
+    // The job is already in a final state (completed/failed).
+    if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+      LOG.info("Job " + jobName + " is failed or already completed, clean up IS.");
+      TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
+      _scheduledRebalancer.removeScheduledRebalance(jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    if (!isWorkflowReadyForSchedule(workflowCfg)) {
+      LOG.info("Job is not ready to be scheduled since workflow is not ready " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    if (!isJobReadyToSchedule(jobName, workflowCfg, workflowCtx)) {
+      LOG.info("Job is not ready to be scheduled " + jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
+
+    // Fetch any existing context information from the property store.
+    JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName);
+    if (jobCtx == null) {
+      jobCtx = new JobContext(new ZNRecord("TaskContext"));
+      jobCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // Grab the old assignment, or an empty one if it doesn't exist
+    ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(jobName);
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
+    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+    ResourceAssignment newAssignment =
+        computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, clusterData
+            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
+            clusterData);
+
+    if (!partitionsToDrop.isEmpty()) {
+      for (Integer pId : partitionsToDrop) {
+        taskIs.getRecord().getMapFields().remove(pName(jobName, pId));
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName);
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update rebalancer context, previous ideal state.
+    TaskUtil.setJobContext(_manager, jobName, jobCtx);
+    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+    setPrevResourceAssignment(jobName, newAssignment);
+
+    LOG.debug("Job " + jobName + " new assignment " + Arrays
+        .toString(newAssignment.getMappedPartitions().toArray()));
+
+    return newAssignment;
+  }
+
+  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
+      WorkflowConfig workflowCfg) {
+    Set<String> ret = new HashSet<String>();
+    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+      if (jobName.equals(currentJobName)) {
+        continue;
+      }
+      JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
+      if (jobContext == null) {
+        continue;
+      }
+      for (int partition : jobContext.getPartitionSet()) {
+        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+        if (partitionState == TaskPartitionState.INIT ||
+            partitionState == TaskPartitionState.RUNNING) {
+          ret.add(jobContext.getAssignedParticipant(partition));
+        }
+      }
+    }
+
+    return ret;
+  }
+
+  private ResourceAssignment computeResourceMapping(String jobResource,
+      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
+      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+      ClusterDataCache cache) {
+    TargetState jobTgtState = workflowConfig.getTargetState();
+    // Update running status in workflow context
+    if (jobTgtState == TargetState.STOP) {
+      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+      // Workflow has been stopped if all in progress jobs are stopped
+      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+    } else {
+      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
+      // Workflow is in progress if any task is in progress
+      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    }
+
+    // Used to keep track of tasks that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+    // Used to keep track of tasks that have failed, but whose failure is acceptable
+    Set<Integer> skippedPartitions = new HashSet<Integer>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+    Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
+
+    // Process all the current assignments of tasks.
+    TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
+    Set<Integer> allPartitions =
+        taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+    Map<String, SortedSet<Integer>> taskAssignments =
+        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    long currentTime = System.currentTimeMillis();
+    for (String instance : taskAssignments.keySet()) {
+      if (excludedInstances.contains(instance)) {
+        continue;
+      }
+
+      Set<Integer> pSet = taskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<Integer>();
+      for (int pId : pSet) {
+        final String pName = pName(jobResource, pId);
+
+        // Check for pending state transitions on this (partition, instance).
+        Message pendingMessage =
+            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
+        if (pendingMessage != null) {
+          // There is a pending state transition for this (partition, instance). Just copy forward
+          // the state assignment from the previous ideal state.
+          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
+          if (stateMap != null) {
+            String prevState = stateMap.get(instance);
+            paMap.put(pId, new PartitionAssignment(instance, prevState));
+            assignedPartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format(
+                  "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+                  pName, instance, prevState));
+            }
+          }
+
+          continue;
+        }
+
+        TaskPartitionState currState =
+            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
+                pName), instance));
+        jobCtx.setPartitionState(pId, currState);
+
+        // Process any requested state transitions.
+        String requestedStateStr =
+            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
+        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
+          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for instance %s.",
+                requestedState, instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format(
+              "Instance %s requested a state transition to %s for partition %s.", instance,
+              requestedState, pName));
+          continue;
+        }
+
+        switch (currState) {
+        case RUNNING:
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (jobTgtState == TargetState.START) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+              nextState, instance));
+        }
+          break;
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          LOG.debug(String
+              .format(
+                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                  pName, currState));
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+        }
+          break;
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          LOG.debug(String.format(
+              "Task partition %s has error state %s. Marking as such in rebalancer context.",
+              pName, currState));
+          markPartitionError(jobCtx, pId, currState, true);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of attempts.
+          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+            // If the user does not require this task to succeed in order for the job to succeed,
+            // then we don't have to fail the job right now
+            boolean successOptional = false;
+            String taskId = jobCtx.getTaskIdForPartition(pId);
+            if (taskId != null) {
+              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+              if (taskConfig != null) {
+                successOptional = taskConfig.isSuccessOptional();
+              }
+            }
+
+            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
+            // to fail the job immediately
+            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
+              successOptional = true;
+            }
+
+            if (!successOptional) {
+              long finishTime = currentTime;
+              workflowCtx.setJobState(jobResource, TaskState.FAILED);
+              if (workflowConfig.isTerminable()) {
+                workflowCtx.setWorkflowState(TaskState.FAILED);
+                workflowCtx.setFinishTime(finishTime);
+              }
+              jobCtx.setFinishTime(finishTime);
+              markAllPartitionsError(jobCtx, currState, false);
+              addAllPartitions(allPartitions, partitionsToDropFromIs);
+
+              // remove IdealState of this job
+              TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+              return buildEmptyAssignment(jobResource, currStateOutput);
+            } else {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+            }
+          } else {
+            // Mark the task to be started at some later time (if enabled)
+            markPartitionDelayed(jobCfg, jobCtx, pId);
+          }
+        }
+          break;
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          LOG.debug(String.format(
+              "Task partition %s has state %s. It will be dropped from the current ideal state.",
+              pName, currState));
+        }
+          break;
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the error states.
+      pSet.removeAll(donePartitions);
+    }
+
+    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
+    scheduleForNextTask(jobResource, jobCtx, currentTime);
+
+    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
+      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
+      jobCtx.setFinishTime(currentTime);
+      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(currentTime);
+      }
+
+      // remove IdealState of this job
+      TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
+    }
+
+    // Make additional task assignments if needed.
+    if (jobTgtState == TargetState.START) {
+      // Contains the set of task partitions that must be excluded from consideration when making
+      // any new assignments.
+      // This includes all completed, failed, delayed, and already assigned partitions.
+      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+      excludeSet.addAll(skippedPartitions);
+      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
+      // Get instance->[partition, ...] mappings for the target resource.
+      Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
+          .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
+              workflowConfig, workflowCtx, allPartitions, cache);
+      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+        String instance = entry.getKey();
+        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
+            .contains(instance)) {
+          continue;
+        }
+        // Contains the set of task partitions currently assigned to the instance.
+        Set<Integer> pSet = entry.getValue();
+        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        if (numToAssign > 0) {
+          List<Integer> nextPartitions =
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+          for (Integer pId : nextPartitions) {
+            String pName = pName(jobResource, pId);
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            excludeSet.add(pId);
+            jobCtx.setAssignedParticipant(pId, instance);
+            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                TaskPartitionState.RUNNING, instance));
+          }
+        }
+      }
+    }
+
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(jobResource);
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
+          ImmutableMap.of(pa._instance, pa._state));
+    }
+
+    return ra;
+  }
+
+  private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
+    // Clear current entries if they exist and are expired
+    long currentTime = now;
+    long scheduledTime = _scheduledRebalancer.getRebalanceTime(job);
+    if (scheduledTime > 0 && currentTime > scheduledTime) {
+      _scheduledRebalancer.removeScheduledRebalance(job);
+    }
+
+    // Figure out the earliest schedulable time in the future of a non-complete job
+    boolean shouldSchedule = false;
+    long earliestTime = Long.MAX_VALUE;
+    for (int p : jobCtx.getPartitionSet()) {
+      long retryTime = jobCtx.getNextRetryTime(p);
+      TaskPartitionState state = jobCtx.getPartitionState(p);
+      state = (state != null) ? state : TaskPartitionState.INIT;
+      Set<TaskPartitionState> errorStates =
+          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
+              TaskPartitionState.TIMED_OUT);
+      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
+        earliestTime = retryTime;
+        shouldSchedule = true;
+      }
+    }
+
+    // If any was found, then schedule it
+    if (shouldSchedule) {
+      _scheduledRebalancer.scheduleRebalance(_manager, job, earliestTime);
+    }
+  }
+
+  /**
+   * Get the last task assignment for a given job
+   *
+   * @param resourceName the name of the job
+   * @return {@link ResourceAssignment} instance, or null if no assignment is available
+   */
+  private ResourceAssignment getPrevResourceAssignment(String resourceName) {
+    ZNRecord r = _manager.getHelixPropertyStore()
+        .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+            null, AccessOption.PERSISTENT);
+    return r != null ? new ResourceAssignment(r) : null;
+  }
+
+  /**
+   * Set the last task assignment for a given job
+   *
+   * @param resourceName the name of the job
+   * @param ra           {@link ResourceAssignment} containing the task assignment
+   */
+  private void setPrevResourceAssignment(String resourceName,
+      ResourceAssignment ra) {
+    _manager.getHelixPropertyStore()
+        .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+            ra.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Checks if the job has completed.
+   * @param ctx The rebalancer context.
+   * @param allPartitions The set of partitions to check.
+   * @param skippedPartitions partitions that failed, but whose failure is acceptable
+   * @return true if all task partitions have been marked with status
+   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
+   *         context, false otherwise.
+   */
+  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
+      Set<Integer> skippedPartitions, JobConfig cfg) {
+    for (Integer pId : allPartitions) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
+          && !isTaskGivenup(ctx, cfg, pId)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+
+  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
+    for (Integer pId : toAdd) {
+      destination.add(pId);
+    }
+  }
+
+  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED) {
+        set.add(pId);
+      }
+    }
+  }
+
+  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
+    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
+  }
+
+  // add all partitions that have been tried maxNumberAttempts
+  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
+      JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (isTaskGivenup(ctx, cfg, pId)) {
+        set.add(pId);
+      }
+    }
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+      Set<Integer> excluded, int n) {
+    List<Integer> result = new ArrayList<Integer>();
+    for (Integer pId : candidatePartitions) {
+      if (result.size() >= n) {
+        break;
+      }
+
+      if (!excluded.contains(pId)) {
+        result.add(pId);
+      }
+    }
+
+    return result;
+  }
+
+  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
+    long delayInterval = cfg.getTaskRetryDelay();
+    if (delayInterval <= 0) {
+      return;
+    }
+    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
+    ctx.setNextRetryTime(p, nextStartTime);
+  }
+
+  private static void markPartitionCompleted(JobContext ctx, int pId) {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+      boolean incrementAttempts) {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    if (incrementAttempts) {
+      ctx.incrementNumAttempts(pId);
+    }
+  }
+
+  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+      boolean incrementAttempts) {
+    for (int pId : ctx.getPartitionSet()) {
+      markPartitionError(ctx, pId, state, incrementAttempts);
+    }
+  }
+
+  /**
+   * Return the assignment of task partitions per instance.
+   */
+  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+    for (String instance : instanceList) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    for (Partition partition : assignment.getMappedPartitions()) {
+      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
+      if (includeSet.contains(pId)) {
+        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
+        for (String instance : replicaMap.keySet()) {
+          SortedSet<Integer> pList = result.get(instance);
+          if (pList != null) {
+            pList.add(pId);
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
+    Set<Integer> nonReadyPartitions = Sets.newHashSet();
+    for (int p : ctx.getPartitionSet()) {
+      long toStart = ctx.getNextRetryTime(p);
+      if (now < toStart) {
+        nonReadyPartitions.add(p);
+      }
+    }
+    return nonReadyPartitions;
+  }
+
+  private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) {
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
+      return genericTaskAssignmentCal;
+    } else {
+      return fixTaskAssignmentCal;
+    }
+  }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  private String pName(String resource, int pId) {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  private static class PartitionAssignment {
+    private final String _instance;
+    private final String _state;
+
+    private PartitionAssignment(String instance, String state) {
+      _instance = instance;
+      _state = state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
new file mode 100644
index 0000000..a3ed5ab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -0,0 +1,45 @@
+package org.apache.helix.task;
+
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ResourceAssignment;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
+public abstract class TaskAssignmentCalculator {
+  /**
+   * Get all the partitions that should be created by this task
+   *
+   * @param jobCfg the task configuration
+   * @param jobCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param cache cluster snapshot
+   * @return set of partition numbers
+   */
+  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+
+  /**
+   * Compute an assignment of tasks to instances
+   *
+   * @param currStateOutput the current state of the instances
+   * @param prevAssignment the previous task partition assignment
+   * @param instances the instances
+   * @param jobCfg the task configuration
+   * @param jobContext the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param cache cluster snapshot
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
+      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      ClusterDataCache cache);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 654ba4e..9b64aec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -188,20 +188,21 @@ public class TaskDriver {
     LOG.info("Starting workflow " + flow.getName());
     flow.validate();
 
-    String flowName = flow.getName();
-
-    // first, add workflow config to ZK
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
+    // first, add workflow config.
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flow.getName()),
         flow.getWorkflowConfig().getResourceConfigMap());
 
-    // then schedule jobs
+    // then add all job configs.
     for (String job : flow.getJobConfigs().keySet()) {
-      JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
+      JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
       if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
-        builder.addTaskConfigs(flow.getTaskConfigs().get(job));
+        jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job));
       }
-      scheduleJob(job, builder.build());
+      addJobConfig(job, jobCfgBuilder.build());
     }
+
+    // Finally add workflow resource.
+    addWorkflowResource(flow.getName());
   }
 
   /** Creates a new named job queue (workflow) */
@@ -210,6 +211,7 @@ public class TaskDriver {
   }
 
   /** Flushes a named job queue */
+  // TODO: need to make sure the queue is stopped or completed before flush the queue.
   public void flushQueue(String queueName) throws Exception {
     WorkflowConfig config =
         TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
@@ -351,54 +353,57 @@ public class TaskDriver {
     _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
   }
 
-  /** Remove the job name from the DAG from the queue configuration */
+  /**
+   * Remove the job name from the DAG from the queue configuration
+   */
   private void removeJobFromDag(final String queueName, final String jobName) {
     final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
 
-    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null.");
+          return null;
+        }
         // Add the node to the existing DAG
         JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
         Set<String> allNodes = jobDag.getAllNodes();
         if (!allNodes.contains(namespacedJobName)) {
-          LOG.warn("Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
-        } else {
-          String parent = null;
-          String child = null;
-          // remove the node from the queue
-          for (String node : allNodes) {
-            if (!node.equals(namespacedJobName)) {
-              if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
-                parent = node;
-                jobDag.removeParentToChild(parent, namespacedJobName);
-              } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
-                child = node;
-                jobDag.removeParentToChild(namespacedJobName, child);
-              }
-            }
-          }
-
-          if (parent != null && child != null) {
-            jobDag.addParentToChild(parent, child);
+          LOG.warn(
+              "Could not delete job from queue " + queueName + ", job " + jobName + " not exists");
+          return currentData;
+        }
+        String parent = null;
+        String child = null;
+        // remove the node from the queue
+        for (String node : allNodes) {
+          if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
+            parent = node;
+            jobDag.removeParentToChild(parent, namespacedJobName);
+          } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) {
+            child = node;
+            jobDag.removeParentToChild(namespacedJobName, child);
           }
+        }
+        if (parent != null && child != null) {
+          jobDag.addParentToChild(parent, child);
+        }
+        jobDag.removeNode(namespacedJobName);
 
-          jobDag.removeNode(namespacedJobName);
-
-          // Save the updated DAG
-          try {
-            currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
-          } catch (Exception e) {
-            throw new IllegalStateException("Could not remove job " + jobName + " from DAG of queue " + queueName, e);
-          }
+        // Save the updated DAG
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          throw new IllegalStateException(
+              "Could not remove job " + jobName + " from DAG of queue " + queueName, e);
         }
         return currentData;
       }
     };
 
     String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
-    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
-    if (!status) {
+    if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) {
       throw new IllegalArgumentException(
           "Could not remove job " + jobName + " from DAG of queue " + queueName);
     }
@@ -449,8 +454,12 @@ public class TaskDriver {
     // Create the job to ensure that it validates
     JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
 
-    // Add the job to the end of the queue in the DAG
     final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+
+    // add job config first.
+    addJobConfig(namespacedJobName, jobConfig);
+
+    // Add the job to the end of the queue in the DAG
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
@@ -495,22 +504,38 @@ public class TaskDriver {
       throw new IllegalArgumentException("Could not enqueue job");
     }
     // Schedule the job
-    scheduleJob(namespacedJobName, jobConfig);
+    TaskUtil.invokeRebalance(_accessor, queueName);
   }
 
-  /** Posts new job to cluster */
-  private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception {
-    // Set up job resource based on partitions from target resource
-    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
-    int numPartitions =
-        (numIndependentTasks > 0) ? numIndependentTasks : _admin
-            .getResourceIdealState(_clusterName, jobConfig.getTargetResource()).getPartitionSet()
-            .size();
-    _admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+  /** Posts new workflow resource to cluster */
+  private void addWorkflowResource(String workflow) {
+    // Add workflow resource
+    _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
+
+    // Push out new ideal state for the workflow
+    CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
+    IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK)
+        .setNumReplica(1).setNumPartitions(1)
+        .setStateModel(TaskConstants.STATE_MODEL_NAME)
+        .setDisableExternalView(true);
+
+    IdealState is = IsBuilder.build();
+    is.getRecord().setListField(workflow, new ArrayList<String>());
+    is.getRecord().setMapField(workflow, new HashMap<String, String>());
+    is.setRebalancerClassName(WorkflowRebalancer.class.getName());
+    _admin.setResourceIdealState(_clusterName, workflow, is);
+
+  }
+
+  /**
+   * Add new job config to cluster
+   */
+  private void addJobConfig(String jobName, JobConfig jobConfig) {
+    LOG.info("Add job configuration " + jobName);
 
     // Set the job configuration
     PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
-    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    HelixProperty resourceConfig = new HelixProperty(jobName);
     resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
     Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
     if (taskConfigMap != null) {
@@ -518,30 +543,10 @@ public class TaskDriver {
         resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
       }
     }
-    _accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
-
-    // Push out new ideal state based on number of target partitions
-    CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
-    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
-    builder.setNumReplica(1);
-    builder.setNumPartitions(numPartitions);
-    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
 
-    if (jobConfig.isDisableExternalView()) {
-      builder.setDisableExternalView(jobConfig.isDisableExternalView());
+    if (!_accessor.setProperty(keyBuilder.resourceConfig(jobName), resourceConfig)) {
+      LOG.error("Failed to add job configuration for job " + jobName);
     }
-
-    IdealState is = builder.build();
-    for (int i = 0; i < numPartitions; i++) {
-      is.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
-      is.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
-    }
-    if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
-      is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
-    } else {
-      is.setRebalancerClassName(FixedTargetTaskRebalancer.class.getName());
-    }
-    _admin.setResourceIdealState(_clusterName, jobResource, is);
   }
 
   /** Public method to resume a workflow/queue */
@@ -565,52 +570,47 @@ public class TaskDriver {
   private void setWorkflowTargetState(String workflowName, TargetState state) {
     setSingleWorkflowTargetState(workflowName, state);
 
-    // TODO: this is the temporary fix for current task rebalance implementation.
-    // We should fix this in new task framework implementation.
+    // TODO: just need to change the lastScheduledWorkflow.
     List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
     for (String resource : resources) {
       if (resource.startsWith(workflowName)) {
         setSingleWorkflowTargetState(resource, state);
       }
     }
-
-    /* TODO: use this code for new task framework.
-    // For recurring schedules, last scheduled incomplete workflow must also be handled
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
-    String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
-    if (lastScheduledWorkflow != null) {
-      WorkflowContext lastScheduledWorkflowCtx =
-          TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
-      if (lastScheduledWorkflowCtx != null && !(
-          lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
-              || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
-        setSingleWorkflowTargetState(lastScheduledWorkflow, state);
-      }
-    }
-    */
   }
 
   /** Helper function to change target state for a given workflow */
   private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
+    LOG.info("Set " + workflowName + " to target state " + state);
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
-        if (currentData != null){
+        if (currentData != null) {
           // Only update target state for non-completed workflows
           String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME);
           if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) {
             currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
+          } else {
+            LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime);
           }
+        } else {
+          LOG.error("TargetState DataUpdater: Fails to update target state " + currentData);
         }
         return currentData;
       }
     };
     List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
-    updaters.add(updater);
     List<String> paths = Lists.newArrayList();
-    paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
-    _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-    invokeRebalance();
+
+    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName);
+    if (_accessor.getProperty(cfgKey) != null) {
+      paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
+      updaters.add(updater);
+      _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+      TaskUtil.invokeRebalance(_accessor, workflowName);
+    } else {
+      LOG.error("Configuration path " + cfgKey + " not found!");
+    }
   }
 
   public void list(String resource) {
@@ -666,21 +666,6 @@ public class TaskDriver {
     }
   }
 
-  /**
-   * Hack to invoke rebalance until bug concerning resource config changes not driving rebalance is
-   * fixed
-   */
-  public void invokeRebalance() {
-    // find a task
-    for (String resource : _admin.getResourcesInCluster(_clusterName)) {
-      IdealState is = _admin.getResourceIdealState(_clusterName, resource);
-      if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
-        _accessor.updateProperty(_accessor.keyBuilder().idealStates(resource), is);
-        break;
-      }
-    }
-  }
-
   /** Constructs options set for all basic control messages */
   private static Options constructOptions() {
     Options options = new Options();


[2/3] helix git commit: [HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.

Posted by ki...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 3842b66..1526883 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,679 +19,56 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
- * Custom rebalancer implementation for the {@code Task} state model.
+ * Abstract rebalancer class for the {@code Task} state model.
  */
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
 
-  // Management of already-scheduled rebalances across jobs
-  private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
-  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
-      .newSingleThreadScheduledExecutor();
-
   // For connection management
-  private HelixManager _manager;
-
-  /**
-   * Get all the partitions that should be created by this task
-   * @param jobCfg the task configuration
-   * @param jobCtx the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param cache cluster snapshot
-   * @return set of partition numbers
-   */
-  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+  protected HelixManager _manager;
+  protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer();
 
-  /**
-   * Compute an assignment of tasks to instances
-   * @param currStateOutput the current state of the instances
-   * @param prevAssignment the previous task partition assignment
-   * @param instances the instances
-   * @param jobCfg the task configuration
-   * @param jobContext the task context
-   * @param workflowCfg the workflow configuration
-   * @param workflowCtx the workflow context
-   * @param partitionSet the partitions to assign
-   * @param cache cluster snapshot
-   * @return map of instances to set of partition numbers
-   */
-  public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
-      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
-      Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
-      ClusterDataCache cache);
-
-  @Override
-  public void init(HelixManager manager) {
+  @Override public void init(HelixManager manager) {
     _manager = manager;
   }
 
-  @Override
-  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
-    final String resourceName = resource.getResourceName();
-    LOG.debug("Computer Best Partition for resource: " + resourceName);
-
-    // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
-    if (jobCfg == null) {
-      LOG.debug("Job configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    String workflowResource = jobCfg.getWorkflow();
-
-    // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
-    if (workflowCfg == null) {
-      LOG.debug("Workflow configuration is NULL for " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
-
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      LOG.info("Workflow context for " + resourceName + " created!");
-    }
-
-    // check ancestor job status
-    int notStartedCount = 0;
-    int inCompleteCount = 0;
-    for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
-      TaskState jobState = workflowCtx.getJobState(ancestor);
-      if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++notStartedCount;
-      } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
-        ++inCompleteCount;
-      }
-    }
-
-    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
-      LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info(
-          "Workflow is marked as deleted " + workflowResource
-              + " cleaning up the workflow context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check if this workflow has been finished past its expiry.
-    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
-        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      LOG.info("Workflow " + workflowResource
-          + " is completed and passed expiry time, cleaning up the workflow context.");
-      markForDeletion(_manager, workflowResource);
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Fetch any existing context information from the property store.
-    JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
-    if (jobCtx == null) {
-      jobCtx = new JobContext(new ZNRecord("TaskContext"));
-      jobCtx.setStartTime(System.currentTimeMillis());
-    }
-
-    // Check for expired jobs for non-terminable workflows
-    long jobFinishTime = jobCtx.getFinishTime();
-    if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
-        && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
-      LOG.info("Job " + resourceName
-          + " is completed and passed expiry time, cleaning up the job context.");
-      cleanup(_manager, resourceName, workflowCfg, workflowResource);
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // The job is already in a final state (completed/failed).
-    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
-        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
-      LOG.debug("Job " + resourceName + " is failed or already completed.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Check for readiness, and stop processing if it's not ready
-    boolean isReady =
-        scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
-    if (!isReady) {
-      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
-      return emptyAssignment(resourceName, currStateOutput);
-    }
-
-    // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
-    if (prevAssignment == null) {
-      prevAssignment = new ResourceAssignment(resourceName);
-    }
-
-    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
-    // is stored in zk.
-    // Fetch the previous resource assignment from the property store. This is required because of
-    // HELIX-230.
-    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
-
-    ResourceAssignment newAssignment =
-        computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
-            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
-            clusterData);
-
-    if (!partitionsToDrop.isEmpty()) {
-      for (Integer pId : partitionsToDrop) {
-        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
-      accessor.setProperty(propertyKey, taskIs);
-    }
-
-    // Update rebalancer context, previous ideal state.
-    TaskUtil.setJobContext(_manager, resourceName, jobCtx);
-    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
-    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
-
-    LOG.debug("Job " + resourceName + " new assignment " + Arrays
-        .toString(newAssignment.getMappedPartitions().toArray()));
-
-    return newAssignment;
-  }
-
-  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
-      WorkflowConfig workflowCfg) {
-
-    Set<String> ret = new HashSet<String>();
-
-    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
-      if (jobName.equals(currentJobName)) {
-        continue;
-      }
-
-      JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
-      if (jobContext == null) {
-        continue;
-      }
-      for (int partition : jobContext.getPartitionSet()) {
-        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
-        if (partitionState == TaskPartitionState.INIT ||
-            partitionState == TaskPartitionState.RUNNING) {
-          ret.add(jobContext.getAssignedParticipant(partition));
-        }
-      }
-    }
-
-    return ret;
-  }
-
-  private ResourceAssignment computeResourceMapping(String jobResource,
-      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
-      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
-      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
-      ClusterDataCache cache) {
-    TargetState jobTgtState = workflowConfig.getTargetState();
-
-    // Update running status in workflow context
-    if (jobTgtState == TargetState.STOP) {
-      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
-      // Workflow has been stopped if all jobs are stopped
-      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.STOPPED);
-      }
-    } else {
-      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
-      // Workflow is in progress if any task is in progress
-      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    }
-
-    // Used to keep track of tasks that have already been assigned to instances.
-    Set<Integer> assignedPartitions = new HashSet<Integer>();
-
-    // Used to keep track of tasks that have failed, but whose failure is acceptable
-    Set<Integer> skippedPartitions = new HashSet<Integer>();
-
-    // Keeps a mapping of (partition) -> (instance, state)
-    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
-
-    Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
-
-    // Process all the current assignments of tasks.
-    Set<Integer> allPartitions =
-        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
-    Map<String, SortedSet<Integer>> taskAssignments =
-        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
-    long currentTime = System.currentTimeMillis();
-    for (String instance : taskAssignments.keySet()) {
-      if (excludedInstances.contains(instance)) {
-        continue;
-      }
-
-      Set<Integer> pSet = taskAssignments.get(instance);
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
-      Set<Integer> donePartitions = new TreeSet<Integer>();
-      for (int pId : pSet) {
-        final String pName = pName(jobResource, pId);
-
-        // Check for pending state transitions on this (partition, instance).
-        Message pendingMessage =
-            currStateOutput.getPendingState(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null) {
-          // There is a pending state transition for this (partition, instance). Just copy forward
-          // the state assignment from the previous ideal state.
-          Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
-          if (stateMap != null) {
-            String prevState = stateMap.get(instance);
-            paMap.put(pId, new PartitionAssignment(instance, prevState));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String
-                  .format(
-                      "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
-                      pName, instance, prevState));
-            }
-          }
-
-          continue;
-        }
-
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
-                pName), instance));
-        jobCtx.setPartitionState(pId, currState);
-
-        // Process any requested state transitions.
-        String requestedStateStr =
-            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
-        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
-          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
-          if (requestedState.equals(currState)) {
-            LOG.warn(String.format(
-                "Requested state %s is the same as the current state for instance %s.",
-                requestedState, instance));
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format(
-              "Instance %s requested a state transition to %s for partition %s.", instance,
-              requestedState, pName));
-          continue;
-        }
-
-        switch (currState) {
-        case RUNNING:
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (jobTgtState == TargetState.START) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
-          }
-
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-              nextState, instance));
-        }
-          break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
-          LOG.debug(String
-              .format(
-                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                  pName, currState));
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-        }
-          break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
-          LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in rebalancer context.",
-              pName, currState));
-          markPartitionError(jobCtx, pId, currState, true);
-          // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of attempts.
-          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
-            // If the user does not require this task to succeed in order for the job to succeed,
-            // then we don't have to fail the job right now
-            boolean successOptional = false;
-            String taskId = jobCtx.getTaskIdForPartition(pId);
-            if (taskId != null) {
-              TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-              if (taskConfig != null) {
-                successOptional = taskConfig.isSuccessOptional();
-              }
-            }
-
-            // Similarly, if we have some leeway for how many tasks we can fail, then we don't have
-            // to fail the job immediately
-            if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
-              successOptional = true;
-            }
-
-            if (!successOptional) {
-              long finishTime = currentTime;
-              workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              if (workflowConfig.isTerminable()) {
-                workflowCtx.setWorkflowState(TaskState.FAILED);
-                workflowCtx.setFinishTime(finishTime);
-              }
-              jobCtx.setFinishTime(finishTime);
-              markAllPartitionsError(jobCtx, currState, false);
-              addAllPartitions(allPartitions, partitionsToDropFromIs);
-              return emptyAssignment(jobResource, currStateOutput);
-            } else {
-              skippedPartitions.add(pId);
-              partitionsToDropFromIs.add(pId);
-            }
-          } else {
-            // Mark the task to be started at some later time (if enabled)
-            markPartitionDelayed(jobCfg, jobCtx, pId);
-          }
-        }
-          break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-          donePartitions.add(pId);
-          LOG.debug(String.format(
-              "Task partition %s has state %s. It will be dropped from the current ideal state.",
-              pName, currState));
-        }
-          break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
-        }
-      }
-
-      // Remove the set of task partitions that are completed or in one of the error states.
-      pSet.removeAll(donePartitions);
-    }
-
-    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
-    scheduleForNextTask(jobResource, jobCtx, currentTime);
-
-    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
-      jobCtx.setFinishTime(currentTime);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-      }
-    }
-
-    // Make additional task assignments if needed.
-    if (jobTgtState == TargetState.START) {
-      // Contains the set of task partitions that must be excluded from consideration when making
-      // any new assignments.
-      // This includes all completed, failed, delayed, and already assigned partitions.
-      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
-      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
-      excludeSet.addAll(skippedPartitions);
-      excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
-      // Get instance->[partition, ...] mappings for the target resource.
-      Map<String, SortedSet<Integer>> tgtPartitionAssignments =
-          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache);
-      for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
-        String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
-          continue;
-        }
-        // Contains the set of task partitions currently assigned to the instance.
-        Set<Integer> pSet = entry.getValue();
-        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
-        if (numToAssign > 0) {
-          List<Integer> nextPartitions =
-              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
-          for (Integer pId : nextPartitions) {
-            String pName = pName(jobResource, pId);
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-            excludeSet.add(pId);
-            jobCtx.setAssignedParticipant(pId, instance);
-            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                TaskPartitionState.RUNNING, instance));
-          }
-        }
-      }
-    }
-
-    // Construct a ResourceAssignment object from the map of partition assignments.
-    ResourceAssignment ra = new ResourceAssignment(jobResource);
-    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
-      PartitionAssignment pa = e.getValue();
-      ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
-          ImmutableMap.of(pa._instance, pa._state));
-    }
-
-    return ra;
-  }
-
-  /**
-   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
-   * @param workflowCfg the workflow to check
-   * @param workflowCtx the current workflow context
-   * @param workflowResource the Helix resource associated with the workflow
-   * @param jobResource a job from the workflow
-   * @param cache the current snapshot of the cluster
-   * @return true if ready, false if not ready
-   */
-  private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      String workflowResource, String jobResource, ClusterDataCache cache) {
-    // Ignore non-scheduled workflows
-    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
-      return true;
-    }
-
-    // Figure out when this should be run, and if it's ready, then just run it
-    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
-    Date startTime = scheduleConfig.getStartTime();
-    long currentTime = new Date().getTime();
-    long delayFromStart = startTime.getTime() - currentTime;
-
-    if (delayFromStart <= 0) {
-      // Remove any timers that are past-time for this workflow
-      Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
-      if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-        LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-
-      // Recurring workflows are just templates that spawn new workflows
-      if (scheduleConfig.isRecurring()) {
-        // Skip scheduling this workflow if it's not in a start state
-        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
-          LOG.debug(
-              "Skip scheduling since the workflow has not been started " + workflowResource);
-          return false;
-        }
-
-        // Skip scheduling this workflow again if the previous run (if any) is still active
-        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
-        if (lastScheduled != null) {
-          WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
-          if (lastWorkflowCtx != null
-              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
-            return false;
-          }
-        }
-
-        // Figure out how many jumps are needed, thus the time to schedule the next workflow
-        // The negative of the delay is the amount of time past the start time
-        long period =
-            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
-        long offsetMultiplier = (-delayFromStart) / period;
-        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
-
-        // Now clone the workflow if this clone has not yet been created
-        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
-        // Now clone the workflow if this clone has not yet been created
-        String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
-        LOG.debug("Ready to start workflow " + newWorkflowName);
-        if (!newWorkflowName.equals(lastScheduled)) {
-          Workflow clonedWf =
-              TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
-                  timeToSchedule));
-          TaskDriver driver = new TaskDriver(_manager);
-          try {
-            // Start the cloned workflow
-            driver.start(clonedWf);
-          } catch (Exception e) {
-            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-          }
-          // Persist workflow start regardless of success to avoid retrying and failing
-          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
-        }
-
-        // Change the time to trigger the pipeline to that of the next run
-        startTime = new Date(timeToSchedule + period);
-        delayFromStart = startTime.getTime() - System.currentTimeMillis();
-      } else {
-        // This is a one-time workflow and is ready
-        return true;
-      }
-    }
-
-    scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
-    return false;
-  }
-
-  private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
-    // Do nothing if there is already a timer set for the this workflow with the same start time.
-    if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
-        || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
-      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
-      return;
-    }
-    LOG.info(
-        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
-            + " delay from start: " + delayFromStart);
-
-    // For workflows not yet scheduled, schedule them and record it
-    RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
-    SCHEDULED_TIMES.put(id, startTime);
-    SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
-  }
-
-  private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
-    // Clear current entries if they exist and are expired
-    long currentTime = now;
-    Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
-    if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
-      LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
-      SCHEDULED_TIMES.remove(jobResource);
-    }
-
-    // Figure out the earliest schedulable time in the future of a non-complete job
-    boolean shouldSchedule = false;
-    long earliestTime = Long.MAX_VALUE;
-    for (int p : ctx.getPartitionSet()) {
-      long retryTime = ctx.getNextRetryTime(p);
-      TaskPartitionState state = ctx.getPartitionState(p);
-      state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
-      if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
-        earliestTime = retryTime;
-        shouldSchedule = true;
-      }
-    }
-
-    // If any was found, then schedule it
-    if (shouldSchedule) {
-      long delay = earliestTime - currentTime;
-      Date startTime = new Date(earliestTime);
-      scheduleRebalance(jobResource, jobResource, startTime, delay);
-    }
-  }
-
-  /**
-   * Checks if the job has completed.
-   * @param ctx The rebalancer context.
-   * @param allPartitions The set of partitions to check.
-   * @param skippedPartitions partitions that failed, but whose failure is acceptable
-   * @return true if all task partitions have been marked with status
-   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
-   *         context, false otherwise.
-   */
-  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
-      Set<Integer> skippedPartitions, JobConfig cfg) {
-    for (Integer pId : allPartitions) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
-          && !isTaskGivenup(ctx, cfg, pId)) {
-        return false;
-      }
-    }
-    return true;
-  }
+  @Override public abstract ResourceAssignment computeBestPossiblePartitionState(
+      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+      CurrentStateOutput currStateOutput);
 
   /**
    * Checks if the workflow has completed.
+   *
    * @param ctx Workflow context containing job states
    * @param cfg Workflow config containing set of jobs
    * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
    */
-  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+  protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
     if (!cfg.isTerminable()) {
       return false;
     }
@@ -705,149 +82,23 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
   /**
    * Checks if the workflow has been stopped.
+   *
    * @param ctx Workflow context containing task states
    * @param cfg Workflow config containing set of tasks
    * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
    */
-  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+  protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
     for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
+      TaskState jobState = ctx.getJobState(job);
+      if (jobState != null && jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
+          && jobState != TaskState.STOPPED)
         return false;
-      }
     }
     return true;
   }
 
-  private static void markForDeletion(HelixManager mgr, String resourceName) {
-    mgr.getConfigAccessor().set(
-        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
-        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
-  }
-
-  /**
-   * Cleans up all Helix state associated with this job, wiping workflow-level information if this
-   * is the last remaining job in its workflow, and the workflow is terminable.
-   */
-  private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg,
-      String workflowResource) {
-    LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
-    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
-
-    // Remove any DAG references in workflow
-    PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
-    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
-        for (String child : jobDag.getDirectChildren(resourceName)) {
-          jobDag.getChildrenToParents().get(child).remove(resourceName);
-        }
-        for (String parent : jobDag.getDirectParents(resourceName)) {
-          jobDag.getParentsToChildren().get(parent).remove(resourceName);
-        }
-        jobDag.getChildrenToParents().remove(resourceName);
-        jobDag.getParentsToChildren().remove(resourceName);
-        jobDag.getAllNodes().remove(resourceName);
-        try {
-          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
-        } catch (Exception e) {
-          LOG.equals("Could not update DAG for job: " + resourceName);
-        }
-        return currentData;
-      }
-    };
-    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
-        AccessOption.PERSISTENT);
-
-    // Delete resource configs.
-    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(cfgKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-          resourceName,
-          cfgKey));
-    }
-
-    // Delete property store information for this resource.
-    // For recurring workflow, it's OK if the node doesn't exist.
-    String propStoreKey = getRebalancerPropStoreKey(resourceName);
-    mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
-
-    // Delete the ideal state itself.
-    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
-    if (!accessor.removeProperty(isKey)) {
-      throw new RuntimeException(String.format(
-          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
-          resourceName, isKey));
-    }
-
-    // Delete dead external view
-    // because job is already completed, there is no more current state change
-    // thus dead external views removal will not be triggered
-    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
-    accessor.removeProperty(evKey);
-
-    LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
-
-    boolean lastInWorkflow = true;
-    for (String job : cfg.getJobDag().getAllNodes()) {
-      // check if property store information or resource configs exist for this job
-      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
-          AccessOption.PERSISTENT)
-          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
-          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
-        lastInWorkflow = false;
-        break;
-      }
-    }
-
-    // clean up workflow-level info if this was the last in workflow
-    if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) {
-      // delete workflow config
-      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
-      if (!accessor.removeProperty(workflowCfgKey)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowCfgKey));
-      }
-      // Delete property store information for this workflow
-      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
-      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
-        throw new RuntimeException(
-            String
-                .format(
-                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
-                    workflowResource, workflowPropStoreKey));
-      }
-      // Remove pending timer for this workflow if exists
-      if (SCHEDULED_TIMES.containsKey(workflowResource)) {
-        SCHEDULED_TIMES.remove(workflowResource);
-      }
-    }
-
-  }
-
-  private static String getRebalancerPropStoreKey(String resource) {
-    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
-  }
-
-  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().idealStates(resource);
-  }
-
-  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
-    return accessor.keyBuilder().resourceConfig(resource);
-  }
-
-  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
-    for (Integer pId : toAdd) {
-      destination.add(pId);
-    }
-  }
-
-  private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) {
+  protected ResourceAssignment buildEmptyAssignment(String name,
+      CurrentStateOutput currStateOutput) {
     ResourceAssignment assignment = new ResourceAssignment(name);
     Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
     for (Partition partition : partitions) {
@@ -861,164 +112,158 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return assignment;
   }
 
-  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
-      Iterable<Integer> pIds) {
-    for (Integer pId : pIds) {
-      TaskPartitionState state = ctx.getPartitionState(pId);
-      if (state == TaskPartitionState.COMPLETED) {
-        set.add(pId);
-      }
-    }
-  }
-
-  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
-    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
-  }
+  /**
+   * Check all the dependencies of a job to determine whether the job is ready to be scheduled.
+   *
+   * @param job
+   * @param workflowCfg
+   * @param workflowCtx
+   * @return
+   */
+  protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx) {
+    int notStartedCount = 0;
+    int inCompleteCount = 0;
 
-  // add all partitions that have been tried maxNumberAttempts
-  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
-      JobConfig cfg) {
-    for (Integer pId : pIds) {
-      if (isTaskGivenup(ctx, cfg, pId)) {
-        set.add(pId);
+    for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
+      TaskState jobState = workflowCtx.getJobState(ancestor);
+      if (jobState == null || jobState == TaskState.NOT_STARTED) {
+        ++notStartedCount;
+      } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+        ++inCompleteCount;
       }
     }
-  }
-
-  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
-      Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>();
-    for (Integer pId : candidatePartitions) {
-      if (result.size() >= n) {
-        break;
-      }
 
-      if (!excluded.contains(pId)) {
-        result.add(pId);
-      }
+    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+      LOG.debug(String
+          .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.",
+              job, notStartedCount, inCompleteCount));
+      return false;
     }
 
-    return result;
-  }
-
-  private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
-    long delayInterval = cfg.getTaskRetryDelay();
-    if (delayInterval <= 0) {
-      return;
-    }
-    long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
-    ctx.setNextRetryTime(p, nextStartTime);
+    return true;
   }
 
-  private static void markPartitionCompleted(JobContext ctx, int pId) {
-    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
+  /**
+   * Check if a workflow is ready to schedule.
+   *
+   * @param workflowCfg the workflow to check
+   * @return true if the workflow is ready for schedule, false if not ready
+   */
+  protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
+    Date startTime = workflowCfg.getStartTime();
+    // Workflow with non-scheduled config or passed start time is ready to schedule.
+    return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
   }
 
-  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
-      boolean incrementAttempts) {
-    ctx.setPartitionState(pId, state);
-    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    if (incrementAttempts) {
-      ctx.incrementNumAttempts(pId);
-    }
+  @Override public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
+    // All of the heavy lifting is in the ResourceAssignment computation,
+    // so this part can just be a no-op.
+    return currentIdealState;
   }
 
-  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
-      boolean incrementAttempts) {
-    for (int pId : ctx.getPartitionSet()) {
-      markPartitionError(ctx, pId, state, incrementAttempts);
-    }
-  }
+  // Management of already-scheduled rebalances across all task entities.
+  protected static class ScheduledRebalancer {
+    private class ScheduledTask {
+      long _startTime;
+      Future _future;
 
-  /**
-   * Return the assignment of task partitions per instance.
-   */
-  private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
-      Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
-    for (String instance : instanceList) {
-      result.put(instance, new TreeSet<Integer>());
-    }
+      public ScheduledTask(long _startTime, Future _future) {
+        this._startTime = _startTime;
+        this._future = _future;
+      }
 
-    for (Partition partition : assignment.getMappedPartitions()) {
-      int pId = pId(partition.getPartitionName());
-      if (includeSet.contains(pId)) {
-        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pList = result.get(instance);
-          if (pList != null) {
-            pList.add(pId);
-          }
-        }
+      public long getStartTime() {
+        return _startTime;
       }
-    }
-    return result;
-  }
 
-  private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
-    Set<Integer> nonReadyPartitions = Sets.newHashSet();
-    for (int p : ctx.getPartitionSet()) {
-      long toStart = ctx.getNextRetryTime(p);
-      if (now < toStart) {
-        nonReadyPartitions.add(p);
+      public Future getFuture() {
+        return _future;
       }
     }
-    return nonReadyPartitions;
-  }
-
-  /**
-   * Computes the partition name given the resource name and partition id.
-   */
-  protected static String pName(String resource, int pId) {
-    return resource + "_" + pId;
-  }
 
-  /**
-   * Extracts the partition id from the given partition name.
-   */
-  protected static int pId(String pName) {
-    String[] tokens = pName.split("_");
-    return Integer.valueOf(tokens[tokens.length - 1]);
-  }
-
-  /**
-   * An (instance, state) pair.
-   */
-  private static class PartitionAssignment {
-    private final String _instance;
-    private final String _state;
+    private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>();
+    private final ScheduledExecutorService _rebalanceExecutor =
+        Executors.newSingleThreadScheduledExecutor();
+
+    /**
+     * Add a future rebalance task for resource at given startTime
+     *
+     * @param resource
+     * @param startTime time in milliseconds
+     */
+    public void scheduleRebalance(HelixManager manager, String resource, long startTime) {
+      // Do nothing if there is already a timer set for the this workflow with the same start time.
+      ScheduledTask existTask = _rebalanceTasks.get(resource);
+      if (existTask != null && existTask.getStartTime() == startTime) {
+        LOG.debug("Schedule timer for job: " + resource + " is up to date.");
+        return;
+      }
 
-    private PartitionAssignment(String instance, String state) {
-      _instance = instance;
-      _state = state;
+      long delay = startTime - System.currentTimeMillis();
+      LOG.info("Schedule rebalance with job: " + resource + " at time: " + startTime + " delay: "
+          + delay);
+
+      // For workflow not yet scheduled, schedule them and record it
+      RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource);
+      ScheduledFuture future =
+          _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
+      ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future));
+      if (prevTask != null && !prevTask.getFuture().isDone()) {
+        if (!prevTask.getFuture().cancel(false)) {
+          LOG.warn("Failed to cancel scheduled timer task for " + resource);
+        }
+      }
     }
-  }
 
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    // All of the heavy lifting is in the ResourceAssignment computation,
-    // so this part can just be a no-op.
-    return currentIdealState;
-  }
+    /**
+     * Get the current schedule time for given resource.
+     *
+     * @param resource
+     * @return existing schedule time or NULL if there is no scheduled task for this resource
+     */
+    public long getRebalanceTime(String resource) {
+      ScheduledTask task = _rebalanceTasks.get(resource);
+      if (task != null) {
+        return task.getStartTime();
+      }
+      return -1;
+    }
+
+    /**
+     * Remove all existing future schedule tasks for the given resource
+     *
+     * @param resource
+     */
+    public void removeScheduledRebalance(String resource) {
+      ScheduledTask existTask = _rebalanceTasks.remove(resource);
+      if (existTask != null && !existTask.getFuture().isDone()) {
+        if (!existTask.getFuture().cancel(true)) {
+          LOG.warn("Failed to cancel scheduled timer task for " + resource);
+        }
+        LOG.info(
+            "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
+                + resource);
+      }
+    }
 
-  /**
-   * The simplest possible runnable that will trigger a run of the controller pipeline
-   */
-  private static class RebalanceInvoker implements Runnable {
-    private final HelixManager _manager;
-    private final String _resource;
+    /**
+     * The simplest possible runnable that will trigger a run of the controller pipeline
+     */
+    private class RebalanceInvoker implements Runnable {
+      private final HelixManager _manager;
+      private final String _resource;
 
-    public RebalanceInvoker(HelixManager manager, String resource) {
-      _manager = manager;
-      _resource = resource;
-    }
+      public RebalanceInvoker(HelixManager manager, String resource) {
+        _manager = manager;
+        _resource = resource;
+      }
 
-    @Override
-    public void run() {
-      TaskUtil.invokeRebalance(_manager, _resource);
+      @Override public void run() {
+        TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index bb62de5..d804fab 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
@@ -56,14 +57,13 @@ import com.google.common.collect.Maps;
 public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
   public static final String CONTEXT_NODE = "Context";
-  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
-
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
-   * @param accessor Accessor to access Helix configs
+   *
+   * @param accessor    Accessor to access Helix configs
    * @param jobResource The name of the job resource
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
-   *         otherwise.
+   * otherwise.
    */
   public static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
     HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
@@ -85,10 +85,11 @@ public class TaskUtil {
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
-   * @param manager HelixManager object used to connect to Helix.
+   *
+   * @param manager     HelixManager object used to connect to Helix.
    * @param jobResource The name of the job resource.
    * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
-   *         otherwise.
+   * otherwise.
    */
   public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
     return getJobCfg(manager.getHelixDataAccessor(), jobResource);
@@ -96,12 +97,13 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
-   * @param cfgAccessor Config accessor to access Helix configs
-   * @param accessor Accessor to access Helix configs
-   * @param clusterName Cluster name
+   *
+   * @param cfgAccessor      Config accessor to access Helix configs
+   * @param accessor         Accessor to access Helix configs
+   * @param clusterName      Cluster name
    * @param workflowResource The name of the workflow resource.
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
-   *         workflow, null otherwise.
+   * workflow, null otherwise.
    */
   public static WorkflowConfig getWorkflowCfg(ConfigAccessor cfgAccessor,
       HelixDataAccessor accessor, String clusterName, String workflowResource) {
@@ -117,10 +119,11 @@ public class TaskUtil {
 
   /**
    * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
-   * @param manager Helix manager object used to connect to Helix.
+   *
+   * @param manager          Helix manager object used to connect to Helix.
    * @param workflowResource The name of the workflow resource.
    * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
-   *         workflow, null otherwise.
+   * workflow, null otherwise.
    */
   public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
     return getWorkflowCfg(manager.getConfigAccessor(), manager.getHelixDataAccessor(),
@@ -129,18 +132,19 @@ public class TaskUtil {
 
   /**
    * Request a state change for a specific task.
-   * @param accessor connected Helix data accessor
-   * @param instance the instance serving the task
+   *
+   * @param accessor  connected Helix data accessor
+   * @param instance  the instance serving the task
    * @param sessionId the current session of the instance
-   * @param resource the job name
+   * @param resource  the job name
    * @param partition the task partition name
-   * @param state the requested state
+   * @param state     the requested state
    * @return true if the request was persisted, false otherwise
    */
   public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
       String sessionId, String resource, String partition, TaskPartitionState state) {
-    LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
-        partition));
+    LOG.debug(
+        String.format("Requesting a state transition to %s for partition %s.", state, partition));
     try {
       PropertyKey.Builder keyBuilder = accessor.keyBuilder();
       PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
@@ -149,16 +153,18 @@ public class TaskUtil {
 
       return accessor.updateProperty(key, currStateDelta);
     } catch (Exception e) {
-      LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
-          state, partition), e);
+      LOG.error(String
+          .format("Error when requesting a state transition to %s for partition %s.", state,
+              partition), e);
       return false;
     }
   }
 
   /**
    * Get a Helix configuration scope at a resource (i.e. job and workflow) level
+   *
    * @param clusterName the cluster containing the resource
-   * @param resource the resource name
+   * @param resource    the resource name
    * @return instantiated {@link HelixConfigScope}
    */
   public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
@@ -167,51 +173,24 @@ public class TaskUtil {
   }
 
   /**
-   * Get the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @return {@link ResourceAssignment} instance, or null if no assignment is available
-   */
-  public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
-      String resourceName) {
-    ZNRecord r =
-        manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-            null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   * @param manager a connection to Helix
-   * @param resourceName the name of the job
-   * @param ra {@link ResourceAssignment} containing the task assignment
-   */
-  public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
-      ResourceAssignment ra) {
-    manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
-        ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  /**
    * Get the runtime context of a single job
+   *
    * @param propertyStore Property store for the cluster
-   * @param jobResource The name of the job
+   * @param jobResource   The name of the job
    * @return the {@link JobContext}, or null if none is available
    */
   public static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore,
       String jobResource) {
-    ZNRecord r =
-        propertyStore.get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
+    ZNRecord r = propertyStore
+        .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
             null, AccessOption.PERSISTENT);
     return r != null ? new JobContext(r) : null;
   }
 
   /**
    * Get the runtime context of a single job
-   * @param manager a connection to Helix
+   *
+   * @param manager     a connection to Helix
    * @param jobResource the name of the job
    * @return the {@link JobContext}, or null if none is available
    */
@@ -221,34 +200,36 @@ public class TaskUtil {
 
   /**
    * Set the runtime context of a single job
-   * @param manager a connection to Helix
+   *
+   * @param manager     a connection to Helix
    * @param jobResource the name of the job
-   * @param ctx the up-to-date {@link JobContext} for the job
+   * @param ctx         the up-to-date {@link JobContext} for the job
    */
   public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
-    manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
-        ctx.getRecord(), AccessOption.PERSISTENT);
+    manager.getHelixPropertyStore()
+        .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
+            ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
   /**
    * Get the runtime context of a single workflow
-   * @param propertyStore Property store of the cluster
+   *
+   * @param propertyStore    Property store of the cluster
    * @param workflowResource The name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
   public static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
       String workflowResource) {
-    ZNRecord r =
-        propertyStore.get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
-                CONTEXT_NODE), null, AccessOption.PERSISTENT);
+    ZNRecord r = propertyStore.get(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
+        null, AccessOption.PERSISTENT);
     return r != null ? new WorkflowContext(r) : null;
   }
 
   /**
    * Get the runtime context of a single workflow
-   * @param manager a connection to Helix
+   *
+   * @param manager          a connection to Helix
    * @param workflowResource the name of the workflow
    * @return the {@link WorkflowContext}, or null if none is available
    */
@@ -258,9 +239,10 @@ public class TaskUtil {
 
   /**
    * Set the runtime context of a single workflow
-   * @param manager a connection to Helix
+   *
+   * @param manager          a connection to Helix
    * @param workflowResource the name of the workflow
-   * @param ctx the up-to-date {@link WorkflowContext} for the workflow
+   * @param ctx              the up-to-date {@link WorkflowContext} for the workflow
    */
   public static void setWorkflowContext(HelixManager manager, String workflowResource,
       WorkflowContext ctx) {
@@ -271,6 +253,7 @@ public class TaskUtil {
 
   /**
    * Get a workflow-qualified job name for a single-job workflow
+   *
    * @param singleJobWorkflow the name of the single-job workflow
    * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow
    */
@@ -280,8 +263,9 @@ public class TaskUtil {
 
   /**
    * Get a workflow-qualified job name for a job in that workflow
+   *
    * @param workflowResource the name of the workflow
-   * @param jobName the un-namespaced name of the job
+   * @param jobName          the un-namespaced name of the job
    * @return The namespaced job name, which is just workflowResource_jobName
    */
   public static String getNamespacedJobName(String workflowResource, String jobName) {
@@ -290,8 +274,9 @@ public class TaskUtil {
 
   /**
    * Remove the workflow namespace from the job name
+   *
    * @param workflowResource the name of the workflow that owns the job
-   * @param jobName the namespaced job name
+   * @param jobName          the namespaced job name
    * @return the denamespaced job name, or the same job name if it is already denamespaced
    */
   public static String getDenamespacedJobName(String workflowResource, String jobName) {
@@ -305,6 +290,7 @@ public class TaskUtil {
 
   /**
    * Serialize a map of job-level configurations as a single string
+   *
    * @param commandConfig map of job config key to config value
    * @return serialized string
    */
@@ -321,6 +307,7 @@ public class TaskUtil {
 
   /**
    * Deserialize a single string into a map of job-level configurations
+   *
    * @param commandConfig the serialized job config map
    * @return a map of job config key to config value
    */
@@ -339,22 +326,27 @@ public class TaskUtil {
 
   /**
    * Trigger a controller pipeline execution for a given resource.
-   * @param manager Helix connection
+   *
+   * @param accessor Helix data accessor
    * @param resource the name of the resource changed to triggering the execution
    */
-  public static void invokeRebalance(HelixManager manager, String resource) {
+  public static void invokeRebalance(HelixDataAccessor accessor, String resource) {
     // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    LOG.info("invoke rebalance for " + resource);
     PropertyKey key = accessor.keyBuilder().idealStates(resource);
     IdealState is = accessor.getProperty(key);
-    if (is != null) {
-      accessor.updateProperty(key, is);
-      LOG.debug("invoke rebalance for " + key);
+    if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+      if (!accessor.updateProperty(key, is)) {
+        LOG.warn("Failed to invoke rebalance on resource " + resource);
+      }
+    } else {
+      LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource);
     }
   }
 
   /**
    * Get a ScheduleConfig from a workflow config string map
+   *
    * @param cfg the string map
    * @return a ScheduleConfig if one exists, otherwise null
    */
@@ -369,11 +361,11 @@ public class TaskUtil {
         return null;
       }
     }
-    if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT)
-        && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
-      return ScheduleConfig.recurringFromDate(startTime,
-          TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
-          Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
+    if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) && cfg
+        .containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
+      return ScheduleConfig
+          .recurringFromDate(startTime, TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
+              Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
     } else if (startTime != null) {
       return ScheduleConfig.oneTimeDelayedStart(startTime);
     }
@@ -382,10 +374,11 @@ public class TaskUtil {
 
   /**
    * Create a new workflow based on an existing one
-   * @param manager connection to Helix
+   *
+   * @param manager          connection to Helix
    * @param origWorkflowName the name of the existing workflow
-   * @param newWorkflowName the name of the new workflow
-   * @param newStartTime a provided start time that deviates from the desired start time
+   * @param newWorkflowName  the name of the new workflow
+   * @param newStartTime     a provided start time that deviates from the desired start time
    * @return the cloned workflow, or null if there was a problem cloning the existing one
    */
   public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
@@ -474,4 +467,61 @@ public class TaskUtil {
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));
   }
+
+  /**
+   * Cleans up IdealState and external view associated with a job/workflow resource.
+   */
+  public static void cleanupIdealStateExtView(HelixDataAccessor accessor, final String resourceName) {
+    LOG.info("Cleaning up idealstate and externalView for job: " + resourceName);
+
+    // Delete the ideal state itself.
+    PropertyKey isKey = accessor.keyBuilder().idealStates(resourceName);
+    if (accessor.getProperty(isKey) != null) {
+      if (!accessor.removeProperty(isKey)) {
+        LOG.error(String.format(
+            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
+            resourceName, isKey));
+      }
+    } else {
+      LOG.warn(String.format("Idealstate for resource %s does not exist.", resourceName));
+    }
+
+    // Delete dead external view
+    // because job is already completed, there is no more current state change
+    // thus dead external views removal will not be triggered
+    PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
+    if (accessor.getProperty(evKey) != null) {
+      if (!accessor.removeProperty(evKey)) {
+        LOG.error(String.format(
+            "Error occurred while trying to clean up resource %s. Failed to remove node %s from Helix.",
+            resourceName, evKey));
+      }
+    }
+
+    LOG.info(String
+        .format("Successfully clean up idealstate/externalView for resource %s.", resourceName));
+  }
+
+  /**
+   * Extracts the partition id from the given partition name.
+   *
+   * @param pName
+   * @return
+   */
+  public static int getPartitionId(String pName) {
+    int index = pName.lastIndexOf("_");
+    if (index == -1) {
+      throw new HelixException("Invalid partition name " + pName);
+    }
+    return Integer.valueOf(pName.substring(index + 1));
+  }
+
+  public static String getWorkflowContextKey(String resource) {
+    // TODO: fix this to use the keyBuilder.
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+  }
+
+  public static PropertyKey getWorkflowConfigKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().resourceConfig(resource);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 259b72c..8ea2691 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -230,7 +230,7 @@ public class Workflow {
     protected Map<String, Map<String, String>> _jobConfigs;
     protected Map<String, List<TaskConfig>> _taskConfigs;
     protected ScheduleConfig _scheduleConfig;
-    protected long _expiry;
+    protected long _expiry = -1;
     protected Map<String, String> _cfgMap;
     protected int _parallelJobs = -1;
 
@@ -239,7 +239,7 @@ public class Workflow {
       _dag = new JobDag();
       _jobConfigs = new TreeMap<String, Map<String, String>>();
       _taskConfigs = new TreeMap<String, List<TaskConfig>>();
-      _expiry = WorkflowConfig.DEFAULT_EXPIRY;
+      _expiry = -1;
     }
 
     public Builder addConfig(String job, String key, String val) {
@@ -340,7 +340,7 @@ public class Workflow {
       if (_expiry > 0) {
         builder.setExpiry(_expiry);
       }
-      if (_parallelJobs != -1) {
+      if (_parallelJobs > 0) {
         builder.setParallelJobs(_parallelJobs);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
new file mode 100644
index 0000000..912f501
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -0,0 +1,412 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.*;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.*;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.log4j.Logger;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Custom rebalancer implementation for the {@code Workflow} in task state model.
+ */
+public class WorkflowRebalancer extends TaskRebalancer {
+  private static final Logger LOG = Logger.getLogger(WorkflowRebalancer.class);
+
+  @Override
+  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+    final String workflow = resource.getResourceName();
+    LOG.debug("Computer Best Partition for workflow: " + workflow);
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflow);
+    if (workflowCfg == null) {
+      LOG.warn("Workflow configuration is NULL for " + workflow);
+      return buildEmptyAssignment(workflow, currStateOutput);
+    }
+
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflow);
+    // Initialize workflow context if needed
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+      LOG.debug("Workflow context is created for " + workflow);
+    }
+
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
+      cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+      return buildEmptyAssignment(workflow, currStateOutput);
+    }
+
+    if (targetState == TargetState.STOP) {
+      LOG.info("Workflow " + workflow + "is marked as stopped.");
+      // Workflow has been stopped if all jobs are stopped
+      // TODO: what should we do if workflowCtx is not set yet?
+      if (workflowCtx != null && isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+      return buildEmptyAssignment(workflow, currStateOutput);
+    }
+
+    long currentTime = System.currentTimeMillis();
+    // Check if workflow is completed and mark it if it is completed.
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+      if (isWorkflowComplete(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(currentTime);
+        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+      }
+    }
+
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
+      LOG.info("Workflow " + workflow + " is completed.");
+      long expiryTime = workflowCfg.getExpiry();
+      // Check if this workflow has been finished past its expiry.
+      if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
+        LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
+        cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+      } else {
+        // schedule future cleanup work
+        long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
+        _scheduledRebalancer.scheduleRebalance(_manager, workflow, cleanupTime);
+      }
+      return buildEmptyAssignment(workflow, currStateOutput);
+    }
+
+    if (!isWorkflowReadyForSchedule(workflowCfg)) {
+      LOG.info("Workflow " + workflow + " is not ready to schedule");
+      // set the timer to trigger future schedule
+      _scheduledRebalancer
+          .scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime());
+      return buildEmptyAssignment(workflow, currStateOutput);
+    }
+
+    // Check for readiness, and stop processing if it's not ready
+    boolean isReady =
+        scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
+    if (isReady) {
+      // Schedule jobs from this workflow.
+      scheduleJobs(workflowCfg, workflowCtx);
+    } else {
+      LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
+    }
+
+    TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+    return buildEmptyAssignment(workflow, currStateOutput);
+  }
+
+  /**
+   * Figure out whether the jobs in the workflow should be run,
+   * and if it's ready, then just schedule it
+   */
+  private void scheduleJobs(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) {
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    if (scheduleConfig != null && scheduleConfig.isRecurring()) {
+      LOG.debug("Jobs from recurring workflow are not schedule-able");
+      return;
+    }
+
+    for (String job : workflowCfg.getJobDag().getAllNodes()) {
+      TaskState jobState = workflowCtx.getJobState(job);
+      if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
+        LOG.debug("Job " + job + " is already started or completed.");
+        continue;
+      }
+      // check ancestor job status
+      if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
+        JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+        scheduleSingleJob(job, jobConfig);
+      }
+    }
+  }
+
+  /**
+   * Posts new job to cluster
+   */
+  private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
+    HelixAdmin admin = _manager.getClusterManagmentTool();
+
+    IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
+    if (jobIS != null) {
+      LOG.info("Job " + jobResource + " idealstate already exists!");
+      return;
+    }
+
+    // Set up job resource based on partitions from target resource
+    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
+    int numPartitions = (numIndependentTasks > 0) ?
+        numIndependentTasks :
+        admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource())
+            .getPartitionSet().size();
+    admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
+        TaskConstants.STATE_MODEL_NAME);
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    // Set the job configuration
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      }
+    }
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+
+    // Push out new ideal state based on number of target partitions
+    CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
+    builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
+    builder.setNumReplica(1);
+    builder.setNumPartitions(numPartitions);
+    builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+
+    if (jobConfig.isDisableExternalView()) {
+      builder.setDisableExternalView(true);
+    }
+
+    jobIS = builder.build();
+    for (int i = 0; i < numPartitions; i++) {
+      jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<String>());
+      jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<String, String>());
+    }
+    jobIS.setRebalancerClassName(JobRebalancer.class.getName());
+    admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
+  }
+
+  /**
+   * Check if a workflow is ready to schedule, and schedule a rebalance if it is not
+   *
+   * @param workflow the Helix resource associated with the workflow
+   * @param workflowCfg  the workflow to check
+   * @param workflowCtx  the current workflow context
+   * @return true if the workflow is ready for schedule, false if not ready
+   */
+  private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
+      WorkflowContext workflowCtx) {
+    // non-scheduled workflow is ready to run immediately.
+    if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
+      return true;
+    }
+
+    // Figure out when this should be run, and if it's ready, then just run it
+    ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
+    Date startTime = scheduleConfig.getStartTime();
+    long currentTime = new Date().getTime();
+    long delayFromStart = startTime.getTime() - currentTime;
+
+    if (delayFromStart <= 0) {
+      // Recurring workflows are just templates that spawn new workflows
+      if (scheduleConfig.isRecurring()) {
+        // Skip scheduling this workflow if it's not in a start state
+        if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
+          return false;
+        }
+
+        // Skip scheduling this workflow again if the previous run (if any) is still active
+        String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
+        if (lastScheduled != null) {
+          WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
+          if (lastWorkflowCtx != null
+              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
+            return false;
+          }
+        }
+
+        // Figure out how many jumps are needed, thus the time to schedule the next workflow
+        // The negative of the delay is the amount of time past the start time
+        long period =
+            scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
+        long offsetMultiplier = (-delayFromStart) / period;
+        long timeToSchedule = period * offsetMultiplier + startTime.getTime();
+
+        // Now clone the workflow if this clone has not yet been created
+        DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
+        LOG.debug("Ready to start workflow " + newWorkflowName);
+        if (!newWorkflowName.equals(lastScheduled)) {
+          Workflow clonedWf = TaskUtil
+              .cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
+          TaskDriver driver = new TaskDriver(_manager);
+          try {
+            // Start the cloned workflow
+            driver.start(clonedWf);
+          } catch (Exception e) {
+            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+          }
+          // Persist workflow start regardless of success to avoid retrying and failing
+          workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
+          TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+        }
+
+        // Change the time to trigger the pipeline to that of the next run
+        _scheduledRebalancer.scheduleRebalance(_manager, workflow, (timeToSchedule + period));
+      } else {
+        // one time workflow.
+        // Remove any timers that are past-time for this workflowg
+        long scheduledTime = _scheduledRebalancer.getRebalanceTime(workflow);
+        if (scheduledTime > 0 && currentTime > scheduledTime) {
+          _scheduledRebalancer.removeScheduledRebalance(workflow);
+        }
+        return true;
+      }
+    } else {
+      // set the timer to trigger future schedule
+      _scheduledRebalancer.scheduleRebalance(_manager, workflow, startTime.getTime());
+    }
+
+    return false;
+  }
+
+  /**
+   * Cleans up workflow configs and workflow contexts associated with this workflow,
+   * including all job-level configs and context, plus workflow-level information.
+   */
+  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
+      WorkflowContext workflowCtx) {
+    LOG.info("Cleaning up workflow: " + workflow);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    /*
+    if (workflowCtx != null && workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+      LOG.error("Workflow " + workflow + " has not completed, abort the clean up task.");
+      return;
+    }*/
+
+    for (String job : workflowcfg.getJobDag().getAllNodes()) {
+      cleanupJob(job, workflow);
+    }
+
+    // clean up workflow-level info if this was the last in workflow
+    if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
+      // clean up IS & EV
+      TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), workflow);
+
+      // delete workflow config
+      PropertyKey workflowCfgKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
+      if (accessor.getProperty(workflowCfgKey) != null) {
+        if (!accessor.removeProperty(workflowCfgKey)) {
+          LOG.error(String.format(
+              "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix.",
+              workflow, workflowCfgKey));
+        }
+      }
+      // Delete workflow context
+      String workflowPropStoreKey = TaskUtil.getWorkflowContextKey(workflow);
+      LOG.info("Removing workflow context: " + workflowPropStoreKey);
+      if (!_manager.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+        LOG.error(String.format(
+            "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+            workflow, workflowPropStoreKey));
+      }
+
+      // Remove pending timer task for this workflow if exists
+      _scheduledRebalancer.removeScheduledRebalance(workflow);
+    }
+  }
+
+
+  /**
+   * Cleans up workflow configs and workflow contexts associated with this workflow,
+   * including all job-level configs and context, plus workflow-level information.
+   */
+  private void cleanupJob(final String job, String workflow) {
+    LOG.info("Cleaning up job: " + job + " in workflow: " + workflow);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    // Remove any idealstate and externalView.
+    TaskUtil.cleanupIdealStateExtView(accessor, job);
+
+    // Remove DAG references in workflow
+    PropertyKey workflowKey = TaskUtil.getWorkflowConfigKey(accessor, workflow);
+    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+          for (String child : jobDag.getDirectChildren(job)) {
+            jobDag.getChildrenToParents().get(child).remove(job);
+          }
+          for (String parent : jobDag.getDirectParents(job)) {
+            jobDag.getParentsToChildren().get(parent).remove(job);
+          }
+          jobDag.getChildrenToParents().remove(job);
+          jobDag.getParentsToChildren().remove(job);
+          jobDag.getAllNodes().remove(job);
+          try {
+            currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+          } catch (Exception e) {
+            LOG.error("Could not update DAG for job: " + job, e);
+          }
+        } else {
+          LOG.error("Could not update DAG for job: " + job + " ZNRecord is null.");
+        }
+        return currentData;
+      }
+    };
+    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
+        AccessOption.PERSISTENT);
+
+    // Delete job configs.
+    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(accessor, job);
+    if (accessor.getProperty(cfgKey) != null) {
+      if (!accessor.removeProperty(cfgKey)) {
+        LOG.error(String.format(
+            "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
+            job, cfgKey));
+      }
+    }
+
+    // Delete job context
+    // For recurring workflow, it's OK if the node doesn't exist.
+    String propStoreKey = TaskUtil.getWorkflowContextKey(job);
+    if (!_manager.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+      LOG.warn(String.format(
+          "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix.",
+          job, propStoreKey));
+    }
+
+    LOG.info(String.format("Successfully cleaned up job context %s.", job));
+
+    _scheduledRebalancer.removeScheduledRebalance(job);
+  }
+
+  @Override
+  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    // Nothing to do here with workflow resource.
+    return currentIdealState;
+  }
+}