You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:45 UTC

[helix] 09/10: Recover Workflow GC Logic (#1181)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3501111a3b9cae27bb7b875fadbf49406a4b6cd0
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Wed Jul 29 10:32:59 2020 -0700

    Recover Workflow GC Logic (#1181)
    
    Recover Workflow Garbage Collection Logic
    Recover Workflow Garbage Collection Logic
---
 .../helix/controller/stages/AttributeName.java     |   6 +-
 .../stages/TaskGarbageCollectionStage.java         |  95 +++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  | 173 ++++++++++++---------
 .../org/apache/helix/task/WorkflowDispatcher.java  |  14 --
 .../helix/controller/stages/TestTaskStage.java     |  89 +++++++++--
 .../integration/task/TestJobQueueCleanUp.java      |  19 ++-
 .../task/TestWorkflowContextWithoutConfig.java     |  66 ++++++++
 .../java/org/apache/helix/task/TestTaskUtil.java   |  95 +++++++++++
 8 files changed, 438 insertions(+), 119 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 589988f..9a0bbb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,5 +40,9 @@ public enum AttributeName {
   PipelineType,
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
-  STATEFUL_REBALANCER
+  STATEFUL_REBALANCER,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+  TO_BE_PURGED_WORKFLOWS,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+  TO_BE_PURGED_JOBS_MAP
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 7eb0db9..915cba1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -19,7 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixManager;
@@ -29,9 +32,12 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
   private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@@ -42,34 +48,89 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   }
 
   @Override
-  public void execute(ClusterEvent event) {
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+  public void process(ClusterEvent event) throws Exception {
+    // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
+    // This is to avoid race conditions since the cache will be modified. After this work, then the
+    // async work will happen.
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-
-    if (dataProvider == null || manager == null) {
+    if (manager == null) {
       LOG.warn(
-          "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Set<WorkflowConfig> existingWorkflows =
-        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
-    for (WorkflowConfig workflowConfig : existingWorkflows) {
-      // clean up the expired jobs if it is a queue.
+    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+    Set<String> workflowsToBePurged = new HashSet<>();
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
+      WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
       if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
           .isJobQueue())) {
-        try {
-          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
-              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
-              _rebalanceScheduler);
-        } catch (Exception e) {
-          LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
-              workflowConfig.getWorkflowId(), e.toString()));
+        WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
+        if (workflowContext == null) {
+          continue;
         }
+        long purgeInterval = workflowConfig.getJobPurgeInterval();
+        long currentTime = System.currentTimeMillis();
+        long nextPurgeTime = workflowContext.getLastJobPurgeTime() + purgeInterval;
+        if (purgeInterval > 0 && nextPurgeTime <= currentTime) {
+          nextPurgeTime = currentTime + purgeInterval;
+          // Find jobs that are ready to be purged
+          Set<String> expiredJobs =
+              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
+          if (!expiredJobs.isEmpty()) {
+            expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
+          }
+        }
+        scheduleNextJobPurge(workflowConfig.getWorkflowId(), nextPurgeTime, _rebalanceScheduler,
+            manager);
+      } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
+          .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
+        // Find workflows that need to be purged
+        workflowsToBePurged.add(entry.getKey());
       }
     }
+    event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
+        Collections.unmodifiableMap(expiredJobsMap));
+    event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
+        Collections.unmodifiableSet(workflowsToBePurged));
+
+    super.process(event);
+  }
 
+  @Override
+  public void execute(ClusterEvent event) {
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    if (manager == null) {
+      LOG.warn(
+          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
+          event.getEventId(), event.getEventType(), event.getClusterName());
+      return;
+    }
+
+    Map<String, Set<String>> expiredJobsMap =
+        event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
+    Set<String> toBePurgedWorkflows =
+        event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
+
+    for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
+      try {
+        TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager, _rebalanceScheduler);
+      } catch (Exception e) {
+        LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
+      }
+    }
+
+    TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
+  }
+
+  private static void scheduleNextJobPurge(String workflow, long nextPurgeTime,
+      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index e11f49e..e6e792f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,20 +735,8 @@ public class TaskUtil {
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
-        if (jobConfig == null) {
-          LOG.error(String.format(
-              "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
-              job, workflowConfig.getWorkflowId()));
-          // Add the job name to expiredJobs so that purge operation will be tried again on this job
+        if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
           expiredJobs.add(job);
-          continue;
-        }
-        long expiry = jobConfig.getExpiry();
-        if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
-          if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
-              && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
-            expiredJobs.add(job);
-          }
         }
       }
     }
@@ -756,6 +744,52 @@ public class TaskUtil {
   }
 
   /**
+   * Based on a workflow's config or context, create a set of jobs that are either expired, which
+   * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
+   * meaning that the job might have been deleted manually from the a job queue, or is left in the
+   * DAG due to a failed clean-up attempt from last purge. The difference between this function and
+   * getExpiredJobs() is that this function gets JobConfig and JobContext from a
+   * WorkflowControllerDataProvider instead of Zk.
+   * @param workflowControllerDataProvider
+   * @param workflowConfig
+   * @param workflowContext
+   * @return
+   */
+  public static Set<String> getExpiredJobsFromCache(
+      WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    Set<String> expiredJobs = new HashSet<>();
+    Map<String, TaskState> jobStates = workflowContext.getJobStates();
+    for (String job : workflowConfig.getJobDag().getAllNodes()) {
+      JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+      JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
+      if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+        expiredJobs.add(job);
+      }
+    }
+    return expiredJobs;
+  }
+
+  /*
+   * Checks if a job is expired and should be purged. This includes a special case when jobConfig
+   * is null. That happens when a job might have been deleted manually from the a job queue, or is
+   * left in the DAG due to a failed clean-up attempt from last purge.
+   */
+  private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
+      TaskState jobState) {
+    if (jobConfig == null) {
+      LOG.warn(
+          "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
+          jobName);
+      return true;
+    }
+    long expiry = jobConfig.getExpiry();
+    return jobContext != null && jobState == TaskState.COMPLETED
+        && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+        && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
+  }
+
+  /**
    * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
    * @param accessor
    * @param propertyStore
@@ -977,72 +1011,71 @@ public class TaskUtil {
   }
 
   /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
+   * Clean up all jobs that are marked as expired.
    */
-  public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext, HelixManager manager,
-      RebalanceScheduler rebalanceScheduler) {
-    if (workflowContext == null) {
-      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
-      return;
+  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+    Set<String> failedJobRemovals = new HashSet<>();
+    for (String job : expiredJobs) {
+      if (!TaskUtil
+          .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
+        failedJobRemovals.add(job);
+        LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
+      }
+      rebalanceScheduler.removeScheduledRebalance(job);
     }
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-    final Set<String> expiredJobs = Sets.newHashSet();
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
-      expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
-          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
-          }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+    // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+    // removal will be tried again at next purge
+    expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
-              + " from the workflow " + workflow);
-        }
+    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+      LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
+          workflow);
+    }
 
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
-          }
-        }
+    if (expiredJobs.size() > 0) {
+      // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+      // concurrent write issue. It is possible that jobs got purged but there is no event to
+      // trigger the pipeline to clean context.
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      List<String> resourceConfigs =
+          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+      if (resourceConfigs.size() > 0) {
+        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+      } else {
+        LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
+            expiredJobs);
       }
     }
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
   }
 
-  private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
-      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+  /**
+   * The function that removes IdealStates and workflow contexts of the workflows that need to be
+   * deleted.
+   * @param toBePurgedWorkflows
+   * @param manager
+   */
+  public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
+      final HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
+
+    for (String workflowName : toBePurgedWorkflows) {
+      LOG.warn(
+          "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
+          workflowName);
+
+      // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
+      if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
+        LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
+            workflowName);
+        continue;
+      }
+
+      if (!removeWorkflowContext(propertyStore, workflowName)) {
+        LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
+      }
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 53be558..4c9bd18 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -356,20 +356,6 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
         TaskConstants.STATE_MODEL_NAME);
 
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
-    // Set the job configuration
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    HelixProperty resourceConfig = new HelixProperty(jobResource);
-    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
-    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
-    if (taskConfigMap != null) {
-      for (TaskConfig taskConfig : taskConfigMap.values()) {
-        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
-      }
-    }
-    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
-
     // Push out new ideal state based on number of target partitions
     IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
     builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index 99e227c..43fda00 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -19,9 +19,17 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -85,7 +93,7 @@ public class TestTaskStage extends TaskTestBase {
         TaskConstants.STATE_MODEL_NAME);
 
     // Create the context
-    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
+    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
     wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
     wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
     wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -144,15 +152,34 @@ public class TestTaskStage extends TaskTestBase {
    * async job purge will try to delete it again.
    */
   @Test(dependsOnMethods = "testPersistContextData")
-  public void testPartialDataPurge() {
+  public void testPartialDataPurge() throws Exception {
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+            AsyncWorkerType.TaskJobPurgeWorker.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            event.run();
+          }
+        };
+    worker.start();
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
+    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
     // Manually delete JobConfig
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
 
+    // Manually refresh because there's no controller notify data change
+    BaseControllerDataProvider dataProvider =
+        _event.getAttribute(AttributeName.ControllerDataProvider.name());
+    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+    dataProvider.refresh(_manager.getHelixDataAccessor());
+
     // Then purge jobs
     TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
-    garbageCollectionStage.execute(_event);
+    garbageCollectionStage.process(_event);
 
     // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
     // IdealState check
@@ -161,6 +188,41 @@ public class TestTaskStage extends TaskTestBase {
     checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
   }
 
+  @Test(dependsOnMethods = "testPartialDataPurge")
+  public void testWorkflowGarbageCollection() throws Exception {
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+            AsyncWorkerType.TaskJobPurgeWorker.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            event.run();
+          }
+        };
+    worker.start();
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
+    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
+    String zkPath =
+        _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
+    _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
+
+    // Manually refresh because there's no controller notify data change
+    BaseControllerDataProvider dataProvider =
+        _event.getAttribute(AttributeName.ControllerDataProvider.name());
+    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+    dataProvider.refresh(_manager.getHelixDataAccessor());
+
+    // Then garbage collect workflow
+    TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
+    garbageCollectionStage.process(_event);
+
+    // Check that IS and contexts have been purged for the workflow
+    checkForIdealStateAndContextRemoval(_testWorkflow);
+
+    worker.shutdown();
+  }
+
   private void deleteJobConfigs(String workflowName, String jobName) {
     String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
     String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -169,16 +231,23 @@ public class TestTaskStage extends TaskTestBase {
     _baseAccessor.remove(newPath, AccessOption.PERSISTENT);
   }
 
-  private void checkForIdealStateAndContextRemoval(String workflow, String job) {
-    // IdealState
-    Assert.assertFalse(
-        _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
-
+  private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
     // JobContexts in old ZNode path
     String oldPath =
         String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
     String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
-    Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
+
+    Assert.assertTrue(TestHelper.verify(
+        () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
+            && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
+            .exists(newPath, AccessOption.PERSISTENT), 120000));
+  }
+
+  private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
+    Assert.assertTrue(TestHelper.verify(() ->
+            !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
+                && !_baseAccessor
+                .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
+        120000));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index daa9b4b..ba4fb44 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -80,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
   }
 
   @Test
-  public void testJobQueueAutoCleanUp() throws InterruptedException {
+  public void testJobQueueAutoCleanUp() throws Exception {
     int capacity = 10;
     String queueName = TestHelper.getTestMethodName();
     JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
@@ -105,14 +105,19 @@ public class TestJobQueueCleanUp extends TaskTestBase {
     }
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
-    Thread.sleep(2000);
 
-    WorkflowConfig config = _driver.getWorkflowConfig(queueName);
-    Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+    Assert
+        .assertTrue(TestHelper.verify(() -> {
+          WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+          System.out.println("|Current time: " + System.currentTimeMillis() +" **TEST: " + config.getJobDag().getAllNodes());
+          return config.getJobDag().getAllNodes().equals(remainJobs);
+        }, TestHelper.WAIT_DURATION));
 
-    WorkflowContext context = _driver.getWorkflowContext(queueName);
-    Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
-    Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+    Assert.assertTrue(TestHelper.verify(() -> {
+      WorkflowContext context = _driver.getWorkflowContext(queueName);
+      return context.getJobStates().keySet().equals(remainJobs) && remainJobs
+          .containsAll(context.getJobStartTimes().keySet());
+    }, TestHelper.WAIT_DURATION));
 
     for (String job : deletedJobs) {
       JobConfig cfg = _driver.getJobConfig(job);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 70dd33f..31010cf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,6 +103,72 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
     Assert.assertTrue(workflowContextNotCreated);
   }
 
+  @Test
+  public void testWorkflowContextGarbageCollection() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
+    _driver.start(builder1.build());
+
+    // Wait until workflow is created and IN_PROGRESS state.
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
+    // workflow
+    Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
+    Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
+    Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
+
+    String workflowContextPath =
+        "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
+
+    ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+        null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(record);
+
+    // Wait until workflow is completed.
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
+    // expired.
+    boolean workflowExpired = TestHelper.verify(() -> {
+      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+      WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
+      IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
+      return (wCtx == null && wCfg == null && idealState == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(workflowExpired);
+
+    _controller.syncStop();
+
+    // Write workflow context to ZooKeeper
+    _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
+        AccessOption.PERSISTENT);
+
+    // Verify context is written back to ZK.
+    record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+        null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(record);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Create and start new workflow just to make sure controller is running and new workflow is
+    // scheduled successfully.
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+    _driver.start(builder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    // Verify that WorkflowContext will be deleted
+    boolean contextDeleted = TestHelper.verify(() -> {
+      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+      return (wCtx == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(contextDeleted);
+  }
+
   private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
     final long expiryTime = 5000L;
     Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..56e756d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,95 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTaskUtil extends TaskTestBase {
+
+  @Test
+  public void testGetExpiredJobsFromCache() {
+    String workflowName = "TEST_WORKFLOW";
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
+
+    JobConfig.Builder jobBuilder_0 =
+        new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    JobConfig.Builder jobBuilder_1 =
+        new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    JobConfig.Builder jobBuilder_2 =
+        new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    JobConfig.Builder jobBuilder_3 =
+        new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
+            .setExpiry(1L);
+    Workflow jobQueue =
+        queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
+            .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build();
+
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    Map<String, TaskState> jobStates = new HashMap<>();
+    jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
+    jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
+    jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
+    jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
+    when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+    JobConfig jobConfig = mock(JobConfig.class);
+    WorkflowControllerDataProvider workflowControllerDataProvider =
+        mock(WorkflowControllerDataProvider.class);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
+        .thenReturn(jobConfig);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
+        .thenReturn(jobConfig);
+    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
+        .thenReturn(jobConfig);
+
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
+
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
+        .thenReturn(jobContext);
+    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
+        .thenReturn(jobContext);
+
+    Set<String> expectedJobs = new HashSet<>();
+    expectedJobs.add(workflowName + "_Job_0");
+    expectedJobs.add(workflowName + "_Job_3");
+    Assert.assertEquals(TaskUtil
+        .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
+            workflowContext), expectedJobs);
+  }
+}