You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/29 20:44:25 UTC

helix git commit: Batch write operations of task framework context update

Repository: helix
Updated Branches:
  refs/heads/master 5033785c2 -> 923002e8e


Batch write operations of task framework context update

Existing task framework pipeline performance is limited by updating Workflow/Job Contexts to ZK. Current controller will update context even after a simple change.

For the first step, we can have synchronized batch write for all the write operations. Then in the future, we should have async batch write for it and combined with no read from context feature.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/923002e8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/923002e8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/923002e8

Branch: refs/heads/master
Commit: 923002e8e355bdff87bb62f2f1fd1939b66641ec
Parents: 5033785
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Oct 29 13:43:04 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 13:43:04 2018 -0700

----------------------------------------------------------------------
 .../webapp/resources/JobQueueResource.java      |  8 +-
 .../webapp/resources/TestJobQueuesResource.java |  2 +-
 .../helix/common/caches/TaskDataCache.java      | 77 ++++++++++++++---
 .../controller/GenericHelixController.java      |  2 +
 .../controller/stages/ClusterDataCache.java     | 14 +--
 .../stages/task/TaskPersistDataStage.java       | 29 +++++++
 .../helix/task/DeprecatedTaskRebalancer.java    |  7 +-
 .../org/apache/helix/task/JobRebalancer.java    |  5 +-
 .../apache/helix/task/WorkflowRebalancer.java   | 45 +++++++---
 .../stages/TestTaskPersistDataStage.java        | 91 ++++++++++++++++++++
 .../helix/integration/task/TaskTestUtil.java    |  2 +
 11 files changed, 236 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index a41fbb4..93c52db 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -46,7 +46,7 @@ import java.util.Map;
  * Class for server-side resource at <code>"/clusters/{clusterName}/jobQueues/{jobQueue}"
  * <p>
  * <li>GET list job queue info
- * <li>POST start a new job in a job queue, or stop/resume/flush/delete a job queue
+ * <li>POST start a new job in a job queue, or stop/resume/persistDataChanges/delete a job queue
  */
 public class JobQueueResource extends ServerResource {
   private final static Logger LOG = LoggerFactory.getLogger(JobQueueResource.class);
@@ -113,7 +113,7 @@ public class JobQueueResource extends ServerResource {
   }
 
   /**
-   * Start a new job in a job queue, or stop/resume/flush/delete a job queue
+   * Start a new job in a job queue, or stop/resume/persistDataChanges/delete a job queue
    * <p>
    * Usage:
    * <p>
@@ -124,8 +124,8 @@ public class JobQueueResource extends ServerResource {
    * input.txt: <code>jsonParameters={"command":"start"}&newJob={newJobConfig.yaml}
    * <p>
    * For newJobConfig.yaml, see {@link Workflow#parse(String)}
-   * <li>Stop/resume/flush/delete a job queue:
-   * <code>curl -d 'jsonParameters={"command":"{stop/resume/flush/delete}"}'
+   * <li>Stop/resume/persistDataChanges/delete a job queue:
+   * <code>curl -d 'jsonParameters={"command":"{stop/resume/persistDataChanges/delete}"}'
    * -H "Content-Type: application/json" http://{host:port}/clusters/{clusterName}/jobQueues/{jobQueue}
    */
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index 131ef88..81597b2 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -186,7 +186,7 @@ public class TestJobQueuesResource extends AdminTestBase {
     LOG.info("Resumed job-queue, ret: " + postRet);
 
     // Flush job queue
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "flush");
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "persistDataChanges");
     postBody =
         JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap);
     postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index c706db3..f8995f6 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -22,16 +22,16 @@ package org.apache.helix.common.caches;
 import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
@@ -39,7 +39,6 @@ import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +53,8 @@ public class TaskDataCache extends AbstractDataCache {
   private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
   private Map<String, WorkflowConfig> _workflowConfigMap = new ConcurrentHashMap<>();
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
+  private Set<String> _contextToUpdate = new HashSet<>();
+  private Set<String> _contextToRemove = new HashSet<>();
   // The following fields have been added for quota-based task scheduling
   private final AssignableInstanceManager _assignableInstanceManager = new AssignableInstanceManager();
 
@@ -186,27 +187,59 @@ public class TaskDataCache extends AbstractDataCache {
   /**
    * Update context of the Job
    */
-  public void updateJobContext(String resourceName, JobContext jobContext,
-      HelixDataAccessor accessor) {
-    updateContext(resourceName, jobContext.getRecord(), accessor);
+  public void updateJobContext(String resourceName, JobContext jobContext) {
+    updateContext(resourceName, jobContext.getRecord());
   }
 
   /**
    * Update context of the Workflow
    */
-  public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext,
-      HelixDataAccessor accessor) {
-    updateContext(resourceName, workflowContext.getRecord(), accessor);
+  public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext) {
+    updateContext(resourceName, workflowContext.getRecord());
   }
 
   /**
    * Update context of the Workflow or Job
    */
-  private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) {
-    String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
-        TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE);
-    accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT);
+  private void updateContext(String resourceName, ZNRecord record) {
     _contextMap.put(resourceName, record);
+    _contextToUpdate.add(resourceName);
+  }
+
+  public void persistDataChanges(HelixDataAccessor accessor) {
+    // Flush Context
+    List<String> contextUpdatePaths = new ArrayList<>();
+    List<ZNRecord> contextUpdateData = new ArrayList<>();
+    List<String> contextUpdateNames = new ArrayList<>(_contextToUpdate);
+    for (String resourceName : contextUpdateNames) {
+      if (_contextMap.get(resourceName) != null && !_contextToRemove.contains(resourceName)) {
+        contextUpdatePaths.add(getContextPath(resourceName));
+        contextUpdateData.add(_contextMap.get(resourceName));
+      }
+    }
+
+    boolean[] updateSuccess = accessor.getBaseDataAccessor()
+        .setChildren(contextUpdatePaths, contextUpdateData, AccessOption.PERSISTENT);
+
+    for (int i = 0; i < updateSuccess.length; i++) {
+      if (updateSuccess[i]) {
+        _contextToUpdate.remove(contextUpdateNames.get(i));
+      }
+    }
+
+    // Delete contexts
+    List<String> contextToRemove = new ArrayList<>();
+    List<String> contextToRemoveNames = new ArrayList<>(_contextToRemove);
+    for (String resourceName : contextToRemoveNames) {
+      contextToRemove.add(getContextPath(resourceName));
+    }
+
+
+    // Current implementation is stateless operation, since Helix read all the contexts back
+    // and redo the works. If it is failed to remove this round, it could be removed in next round.
+
+    // Also if the context has already been removed, it should be fine.
+    accessor.getBaseDataAccessor().remove(contextToRemove, AccessOption.PERSISTENT);
   }
 
   /**
@@ -225,6 +258,17 @@ public class TaskDataCache extends AbstractDataCache {
     return _assignableInstanceManager;
   }
 
+  /**
+   * Remove Workflow or Job context from cache
+   * @param resourceName
+   */
+  public void removeContext(String resourceName) {
+    if (_contextMap.containsKey(resourceName)) {
+      _contextMap.remove(resourceName);
+      _contextToRemove.add(resourceName);
+    }
+  }
+
   @Override
   public String toString() {
     return "TaskDataCache{"
@@ -234,4 +278,9 @@ public class TaskDataCache extends AbstractDataCache {
         + ", _clusterName='" + _clusterName
         + '\'' + '}';
   }
+
+  private String getContextPath(String resourceName) {
+    return String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
+        TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index b7934b5..6a8fedd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -51,6 +51,7 @@ import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
 import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
 import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
+import org.apache.helix.controller.stages.task.TaskPersistDataStage;
 import org.apache.helix.controller.stages.task.TaskSchedulingStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
@@ -316,6 +317,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
       rebalancePipeline.addStage(new TaskSchedulingStage());
+      rebalancePipeline.addStage(new TaskPersistDataStage());
       rebalancePipeline.addStage(new TaskGarbageCollectionStage());
       rebalancePipeline.addStage(new TaskMessageGenerationPhase());
       rebalancePipeline.addStage(new TaskMessageDispatchStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index da22c5e..2299f92 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -760,17 +760,19 @@ public class ClusterDataCache extends AbstractDataCache {
   /**
    * Update context of the Job
    */
-  public void updateJobContext(String resourceName, JobContext jobContext,
-      HelixDataAccessor accessor) {
-    _taskDataCache.updateJobContext(resourceName, jobContext, accessor);
+  public void updateJobContext(String resourceName, JobContext jobContext) {
+    _taskDataCache.updateJobContext(resourceName, jobContext);
   }
 
   /**
    * Update context of the Workflow
    */
-  public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext,
-      HelixDataAccessor accessor) {
-    _taskDataCache.updateWorkflowContext(resourceName, workflowContext, accessor);
+  public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext) {
+    _taskDataCache.updateWorkflowContext(resourceName, workflowContext);
+  }
+
+  public TaskDataCache getTaskDataCache() {
+    return _taskDataCache;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
new file mode 100644
index 0000000..c14b27d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskPersistDataStage.java
@@ -0,0 +1,29 @@
+package org.apache.helix.controller.stages.task;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskPersistDataStage extends AbstractBaseStage {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskPersistDataStage.class);
+
+  @Override
+  public void process(ClusterEvent event) {
+    LOG.info("START TaskPersistDataStage.process()");
+    long startTime = System.currentTimeMillis();
+
+    // Persist partition assignment of resources.
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    cache.getTaskDataCache().persistDataChanges(manager.getHelixDataAccessor());
+
+    long endTime = System.currentTimeMillis();
+    LOG.info(
+        "END TaskPersistDataStage.process() for cluster " + cache.getClusterName() + " took " + (
+            endTime - startTime) + " ms");
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 9903117..e02e8f2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -248,9 +248,9 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
     }
 
     // Update Workflow and Job context in data cache and ZK.
-    clusterData.updateJobContext(resourceName, jobCtx, _manager.getHelixDataAccessor());
+    clusterData.updateJobContext(resourceName, jobCtx);
     clusterData
-        .updateWorkflowContext(workflowResource, workflowCtx, _manager.getHelixDataAccessor());
+        .updateWorkflowContext(workflowResource, workflowCtx);
 
     setPrevResourceAssignment(_manager, resourceName, newAssignment);
 
@@ -639,8 +639,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
           }
           // Persist workflow start regardless of success to avoid retrying and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
-          cache.updateWorkflowContext(workflowResource, workflowCtx,
-              _manager.getHelixDataAccessor());
+          cache.updateWorkflowContext(workflowResource, workflowCtx);
         }
 
         // Change the time to trigger the pipeline to that of the next run

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 4dbf0a0..abc260a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -194,9 +194,8 @@ public class JobRebalancer extends TaskRebalancer {
     }
 
     // Update Workflow and Job context in data cache and ZK.
-    clusterData.updateJobContext(jobName, jobCtx, _manager.getHelixDataAccessor());
-    clusterData.updateWorkflowContext(workflowResource, workflowCtx,
-        _manager.getHelixDataAccessor());
+    clusterData.updateJobContext(jobName, jobCtx);
+    clusterData.updateWorkflowContext(workflowResource, workflowCtx);
 
     setPrevResourceAssignment(jobName, newAssignment);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 6d1bed5..6851475 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -37,6 +37,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
@@ -77,7 +78,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
-      cleanupWorkflow(workflow, workflowCfg);
+      cleanupWorkflow(workflow, workflowCfg, clusterData.getTaskDataCache());
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -91,7 +92,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
           && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
         workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
-        clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
+        clusterData.updateWorkflowContext(workflow, workflowCtx);
       }
 
       // We should not return after setting timeout, as in case the workflow is stopped already
@@ -108,7 +109,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
       if (isWorkflowStopped(workflowCtx, workflowCfg)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
-        clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
+        clusterData.updateWorkflowContext(workflow, workflowCtx);
       }
       return buildEmptyAssignment(workflow, currStateOutput);
     }
@@ -125,7 +126,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
         workflowCfg, clusterData.getJobConfigMap(), clusterData)) {
       workflowCtx.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowCtx, workflowCfg);
-      clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
+      clusterData.updateWorkflowContext(workflow, workflowCtx);
     }
 
     // Step 5: Handle finished workflows
@@ -135,7 +136,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       // Check if this workflow has been finished past its expiry.
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
         LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
-        cleanupWorkflow(workflow, workflowCfg);
+        cleanupWorkflow(workflow, workflowCfg, clusterData.getTaskDataCache());
       } else {
         // schedule future cleanup work
         long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
@@ -171,7 +172,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       }
     }
 
-    clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
+    clusterData.updateWorkflowContext(workflow, workflowCtx);
     return buildEmptyAssignment(workflow, currStateOutput);
   }
 
@@ -394,13 +395,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
           Workflow clonedWf =
               cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
           TaskDriver driver = new TaskDriver(_manager);
-          try {
-            // Start the cloned workflow
-            driver.start(clonedWf);
-          } catch (Exception e) {
-            LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
-            _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(),
-                TaskState.FAILED);
+          if (clonedWf != null) {
+            try {
+              // Start the cloned workflow
+              driver.start(clonedWf);
+            } catch (Exception e) {
+              LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+              _clusterStatusMonitor.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
+            }
           }
           // Persist workflow start regardless of success to avoid retrying and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
@@ -508,7 +510,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
    * contexts associated with this workflow, and all jobs information, including their configs,
    * context, IS and EV.
    */
-  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
+  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
+      TaskDataCache taskDataCache) {
     LOG.info("Cleaning up workflow: " + workflow);
 
     if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
@@ -521,6 +524,11 @@ public class WorkflowRebalancer extends TaskRebalancer {
       if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
           _manager.getHelixPropertyStore(), workflow, jobs)) {
         LOG.warn("Failed to clean up workflow " + workflow);
+      } else {
+        // Only remove from cache when remove all workflow success. Otherwise, batch write will
+        // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
+        // and jobs will rescheduled again.
+        removeContexts(workflow, jobs, taskDataCache);
       }
     } else {
       LOG.info("Did not clean up workflow " + workflow
@@ -528,6 +536,15 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
   }
 
+  private void removeContexts(String workflow, Set<String> jobs, TaskDataCache cache) {
+    if (jobs != null) {
+      for (String job : jobs) {
+        cache.removeContext(job);
+      }
+    }
+    cache.removeContext(workflow);
+  }
+
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
new file mode 100644
index 0000000..8fcedfa
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.task.TaskPersistDataStage;
+import org.apache.helix.participant.MockZKHelixManager;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskPersistDataStage extends ZkTestBase {
+  private String CLUSTER_NAME = "TestCluster_" + TestHelper.getTestClassName();
+  private HelixManager _helixManager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() {
+    _helixManager = new MockZKHelixManager(CLUSTER_NAME, "MockInstance", InstanceType.ADMINISTRATOR,
+        _gZkClient);
+    _driver = new TaskDriver(_gZkClient, CLUSTER_NAME);
+  }
+
+  @Test
+  public void testTaskContextUpdate() {
+    ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
+    event.addAttribute(AttributeName.helixmanager.name(), _helixManager);
+    TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
+
+    ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
+    TaskDataCache taskDataCache = cache.getTaskDataCache();
+    String testWorkflow = TestHelper.getTestMethodName();
+    String testJobPrefix = testWorkflow + "_Job_";
+
+    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(testWorkflow));
+    wfCtx.setJobState(testJobPrefix + "0", TaskState.IN_PROGRESS);
+    wfCtx.setJobState(testJobPrefix + "1", TaskState.COMPLETED);
+    wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    wfCtx.setName(testWorkflow);
+    wfCtx.setStartTime(System.currentTimeMillis());
+
+    JobContext jbCtx0 = new JobContext(new ZNRecord(testJobPrefix + "0"));
+    jbCtx0.setName(testJobPrefix + "0");
+    jbCtx0.setStartTime(System.currentTimeMillis());
+    jbCtx0.setPartitionState(0, TaskPartitionState.RUNNING);
+
+    JobContext jbCtx1 = new JobContext((new ZNRecord(testJobPrefix + "1")));
+    jbCtx1.setName(testJobPrefix + "1");
+    jbCtx1.setStartTime(System.currentTimeMillis());
+    jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
+
+    taskDataCache.updateWorkflowContext(testWorkflow, wfCtx);
+    taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0);
+    taskDataCache.updateJobContext(testJobPrefix + "1", jbCtx1);
+
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    persistDataStage.process(event);
+
+    jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
+    wfCtx.setJobState(testJobPrefix + "0", TaskState.FAILED);
+    taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0);
+
+    wfCtx.getJobStates().remove(testJobPrefix + "1");
+    taskDataCache.removeContext(testJobPrefix + "1");
+
+    JobContext jbCtx2 = new JobContext(new ZNRecord(testJobPrefix + "2"));
+    jbCtx2.setName(testJobPrefix + "2");
+    jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
+    wfCtx.setJobState(testJobPrefix + "2", TaskState.IN_PROGRESS);
+    taskDataCache.updateJobContext(testJobPrefix + "2", jbCtx2);
+
+    taskDataCache.updateWorkflowContext(testWorkflow, wfCtx);
+    persistDataStage.process(event);
+
+    Assert.assertEquals(_driver.getWorkflowContext(testWorkflow), wfCtx);
+    Assert.assertEquals(_driver.getJobContext(testJobPrefix + "0"), jbCtx0);
+    Assert.assertEquals(_driver.getJobContext(testJobPrefix + "2"), jbCtx2);
+    Assert.assertNull(_driver.getJobContext(testJobPrefix + "1"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/923002e8/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 7634616..1887689 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -46,6 +46,7 @@ import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
+import org.apache.helix.controller.stages.task.TaskPersistDataStage;
 import org.apache.helix.controller.stages.task.TaskSchedulingStage;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.JobContext;
@@ -320,6 +321,7 @@ public class TaskTestUtil {
     stages.add(new ResourceComputationStage());
     stages.add(new CurrentStateComputationStage());
     stages.add(new TaskSchedulingStage());
+    stages.add(new TaskPersistDataStage());
     stages.add(new TaskGarbageCollectionStage());
 
     for (Stage stage : stages) {