You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/07/17 19:07:32 UTC

[helix] branch master updated: Revert "Recover Workflow Garbage Collection Logic (#1076)" (#1155)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fcfa37f  Revert "Recover Workflow Garbage Collection Logic (#1076)" (#1155)
fcfa37f is described below

commit fcfa37f199d587f3043f4fb9d1d289412c2f6750
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Fri Jul 17 12:06:08 2020 -0700

    Revert "Recover Workflow Garbage Collection Logic (#1076)" (#1155)
    
    This reverts commit c7a97bdce66b21dab5522a4f100d08054ada99c2.
---
 .../helix/controller/stages/AttributeName.java     |   6 +-
 .../stages/TaskGarbageCollectionStage.java         |  93 ++---------
 .../main/java/org/apache/helix/task/TaskUtil.java  | 173 +++++++++------------
 .../org/apache/helix/task/WorkflowDispatcher.java  |  14 ++
 .../helix/controller/stages/TestTaskStage.java     |  89 ++---------
 .../task/TestWorkflowContextWithoutConfig.java     |  66 --------
 .../java/org/apache/helix/task/TestTaskUtil.java   |  95 -----------
 7 files changed, 112 insertions(+), 424 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 557bb20..0af0ee5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,9 +40,5 @@ public enum AttributeName {
   PipelineType,
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
-  STATEFUL_REBALANCER,
-  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
-  TO_BE_PURGED_WORKFLOWS,
-  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
-  TO_BE_PURGED_JOBS_MAP
+  STATEFUL_REBALANCER
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index f965a3e..3f8e744 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -1,9 +1,6 @@
 package org.apache.helix.controller.stages;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixManager;
@@ -13,12 +10,9 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
   private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@@ -29,87 +23,34 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
-    // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
-    // This is to avoid race conditions since the cache will be modified. After this work, then the
-    // async work will happen.
+  public void execute(ClusterEvent event) {
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-    if (manager == null) {
+
+    if (dataProvider == null || manager == null) {
       LOG.warn(
-          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
-    Set<String> workflowsToBePurged = new HashSet<>();
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
-    for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
-      WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
+    Set<WorkflowConfig> existingWorkflows =
+        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+    for (WorkflowConfig workflowConfig : existingWorkflows) {
+      // clean up the expired jobs if it is a queue.
       if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
           .isJobQueue())) {
-        WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
-        long purgeInterval = workflowConfig.getJobPurgeInterval();
-        long currentTime = System.currentTimeMillis();
-        if (purgeInterval > 0
-            && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
-          // Find jobs that are ready to be purged
-          Set<String> expiredJobs =
-              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
-          if (!expiredJobs.isEmpty()) {
-            expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
-          }
-          scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime, purgeInterval,
-              _rebalanceScheduler, manager);
+        try {
+          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+              _rebalanceScheduler);
+        } catch (Exception e) {
+          LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
+              workflowConfig.getWorkflowId(), e.toString()));
         }
-      } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
-          .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
-        // Find workflows that need to be purged
-        workflowsToBePurged.add(entry.getKey());
-      }
-    }
-    event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
-        Collections.unmodifiableMap(expiredJobsMap));
-    event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
-        Collections.unmodifiableSet(workflowsToBePurged));
-
-    super.process(event);
-  }
-
-  @Override
-  public void execute(ClusterEvent event) {
-    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-    if (manager == null) {
-      LOG.warn(
-          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
-          event.getEventId(), event.getEventType(), event.getClusterName());
-      return;
-    }
-
-    Map<String, Set<String>> expiredJobsMap =
-        event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
-    Set<String> toBePurgedWorkflows =
-        event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
-
-    for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
-      try {
-        TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager,
-            _rebalanceScheduler);
-      } catch (Exception e) {
-        LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
       }
     }
 
-    TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
-  }
-
-  private static void scheduleNextJobPurge(String workflow, long currentTime, long purgeInterval,
-      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
-    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ec8d216..917a69b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,8 +735,20 @@ public class TaskUtil {
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
-        if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+        if (jobConfig == null) {
+          LOG.error(String.format(
+              "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
+              job, workflowConfig.getWorkflowId()));
+          // Add the job name to expiredJobs so that purge operation will be tried again on this job
           expiredJobs.add(job);
+          continue;
+        }
+        long expiry = jobConfig.getExpiry();
+        if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+          if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+              && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+            expiredJobs.add(job);
+          }
         }
       }
     }
@@ -744,52 +756,6 @@ public class TaskUtil {
   }
 
   /**
-   * Based on a workflow's config or context, create a set of jobs that are either expired, which
-   * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
-   * meaning that the job might have been deleted manually from the a job queue, or is left in the
-   * DAG due to a failed clean-up attempt from last purge. The difference between this function and
-   * getExpiredJobs() is that this function gets JobConfig and JobContext from a
-   * WorkflowControllerDataProvider instead of Zk.
-   * @param workflowControllerDataProvider
-   * @param workflowConfig
-   * @param workflowContext
-   * @return
-   */
-  public static Set<String> getExpiredJobsFromCache(
-      WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
-    Set<String> expiredJobs = new HashSet<>();
-    Map<String, TaskState> jobStates = workflowContext.getJobStates();
-    for (String job : workflowConfig.getJobDag().getAllNodes()) {
-      JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
-      JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
-      if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
-        expiredJobs.add(job);
-      }
-    }
-    return expiredJobs;
-  }
-
-  /*
-   * Checks if a job is expired and should be purged. This includes a special case when jobConfig
-   * is null. That happens when a job might have been deleted manually from the a job queue, or is
-   * left in the DAG due to a failed clean-up attempt from last purge.
-   */
-  private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
-      TaskState jobState) {
-    if (jobConfig == null) {
-      LOG.warn(
-          "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
-          jobName);
-      return true;
-    }
-    long expiry = jobConfig.getExpiry();
-    return jobContext != null && jobState == TaskState.COMPLETED
-        && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
-        && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
-  }
-
-  /**
    * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
    * @param accessor
    * @param propertyStore
@@ -1011,71 +977,72 @@ public class TaskUtil {
   }
 
   /**
-   * Clean up all jobs that are marked as expired.
+   * Clean up all jobs that are COMPLETED and passes its expiry time.
+   * @param workflowConfig
+   * @param workflowContext
    */
-  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
-      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
-    Set<String> failedJobRemovals = new HashSet<>();
-    for (String job : expiredJobs) {
-      if (!TaskUtil
-          .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
-        failedJobRemovals.add(job);
-        LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
-      }
-      rebalanceScheduler.removeScheduledRebalance(job);
+  public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext, HelixManager manager,
+      RebalanceScheduler rebalanceScheduler) {
+    if (workflowContext == null) {
+      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
+      return;
     }
+    long purgeInterval = workflowConfig.getJobPurgeInterval();
+    long currentTime = System.currentTimeMillis();
+    final Set<String> expiredJobs = Sets.newHashSet();
+    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+      expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
+          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
+      if (expiredJobs.isEmpty()) {
+        LOG.info("No job to purge for the queue " + workflow);
+      } else {
+        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        Set<String> failedJobRemovals = new HashSet<>();
+        for (String job : expiredJobs) {
+          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+              job)) {
+            failedJobRemovals.add(job);
+            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+          }
+          rebalanceScheduler.removeScheduledRebalance(job);
+        }
 
-    // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
-    // removal will be tried again at next purge
-    expiredJobs.removeAll(failedJobRemovals);
+        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+        // removal will be tried again at next purge
+        expiredJobs.removeAll(failedJobRemovals);
 
-    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
-      LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
-          workflow);
-    }
+        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
+            true)) {
+          LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
+              + " from the workflow " + workflow);
+        }
 
-    if (expiredJobs.size() > 0) {
-      // Update workflow context will be in main pipeline not here. Otherwise, it will cause
-      // concurrent write issue. It is possible that jobs got purged but there is no event to
-      // trigger the pipeline to clean context.
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      List<String> resourceConfigs =
-          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-      if (resourceConfigs.size() > 0) {
-        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
-      } else {
-        LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
-            expiredJobs);
+        if (expiredJobs.size() > 0) {
+          // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+          // concurrent write issue. It is possible that jobs got purged but there is no event to
+          // trigger the pipeline to clean context.
+          HelixDataAccessor accessor = manager.getHelixDataAccessor();
+          List<String> resourceConfigs =
+              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+          if (resourceConfigs.size() > 0) {
+            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+          } else {
+            LOG.warn(
+                "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
+          }
+        }
       }
     }
+    setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
   }
 
-  /**
-   * The function that removes IdealStates and workflow contexts of the workflows that need to be
-   * deleted.
-   * @param toBePurgedWorkflows
-   * @param manager
-   */
-  public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
-      final HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
-
-    for (String workflowName : toBePurgedWorkflows) {
-      LOG.warn(
-          "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
-          workflowName);
-
-      // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
-      if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
-        LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
-            workflowName);
-        continue;
-      }
-
-      if (!removeWorkflowContext(propertyStore, workflowName)) {
-        LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
-      }
+  private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
+      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+    long nextPurgeTime = currentTime + purgeInterval;
+    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 8c49e1e..89e6d76 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -356,6 +356,20 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
         TaskConstants.STATE_MODEL_NAME);
 
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    // Set the job configuration
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      }
+    }
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+
     // Push out new ideal state based on number of target partitions
     IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
     builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index db27355..fefc737 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -1,16 +1,8 @@
 package org.apache.helix.controller.stages;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.helix.AccessOption;
-import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.common.DedupEventProcessor;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
-import org.apache.helix.controller.pipeline.AsyncWorkerType;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -74,7 +66,7 @@ public class TestTaskStage extends TaskTestBase {
         TaskConstants.STATE_MODEL_NAME);
 
     // Create the context
-    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
     wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
     wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
     wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -133,34 +125,15 @@ public class TestTaskStage extends TaskTestBase {
    * async job purge will try to delete it again.
    */
   @Test(dependsOnMethods = "testPersistContextData")
-  public void testPartialDataPurge() throws Exception {
-    DedupEventProcessor<String, Runnable> worker =
-        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
-            AsyncWorkerType.TaskJobPurgeWorker.name()) {
-          @Override
-          protected void handleEvent(Runnable event) {
-            event.run();
-          }
-        };
-    worker.start();
-    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
-    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
-    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
-
+  public void testPartialDataPurge() {
     // Manually delete JobConfig
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
 
-    // Manually refresh because there's no controller notify data change
-    BaseControllerDataProvider dataProvider =
-        _event.getAttribute(AttributeName.ControllerDataProvider.name());
-    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
-    dataProvider.refresh(_manager.getHelixDataAccessor());
-
     // Then purge jobs
     TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
-    garbageCollectionStage.process(_event);
+    garbageCollectionStage.execute(_event);
 
     // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
     // IdealState check
@@ -169,41 +142,6 @@ public class TestTaskStage extends TaskTestBase {
     checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
   }
 
-  @Test(dependsOnMethods = "testPartialDataPurge")
-  public void testWorkflowGarbageCollection() throws Exception {
-    DedupEventProcessor<String, Runnable> worker =
-        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
-            AsyncWorkerType.TaskJobPurgeWorker.name()) {
-          @Override
-          protected void handleEvent(Runnable event) {
-            event.run();
-          }
-        };
-    worker.start();
-    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
-    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
-    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
-
-    String zkPath =
-        _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
-    _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
-
-    // Manually refresh because there's no controller notify data change
-    BaseControllerDataProvider dataProvider =
-        _event.getAttribute(AttributeName.ControllerDataProvider.name());
-    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
-    dataProvider.refresh(_manager.getHelixDataAccessor());
-
-    // Then garbage collect workflow
-    TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
-    garbageCollectionStage.process(_event);
-
-    // Check that IS and contexts have been purged for the workflow
-    checkForIdealStateAndContextRemoval(_testWorkflow);
-
-    worker.shutdown();
-  }
-
   private void deleteJobConfigs(String workflowName, String jobName) {
     String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
     String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -212,23 +150,16 @@ public class TestTaskStage extends TaskTestBase {
     _baseAccessor.remove(newPath, AccessOption.PERSISTENT);
   }
 
-  private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
+  private void checkForIdealStateAndContextRemoval(String workflow, String job) {
+    // IdealState
+    Assert.assertFalse(
+        _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
+
     // JobContexts in old ZNode path
     String oldPath =
         String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
     String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
-
-    Assert.assertTrue(TestHelper.verify(
-        () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
-            && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
-            .exists(newPath, AccessOption.PERSISTENT), 120000));
-  }
-
-  private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
-    Assert.assertTrue(TestHelper.verify(() ->
-            !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
-                && !_baseAccessor
-                .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
-        120000));
+    Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 84df546..6c29f3a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,72 +103,6 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
     Assert.assertTrue(workflowContextNotCreated);
   }
 
-  @Test
-  public void testWorkflowContextGarbageCollection() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
-    _driver.start(builder1.build());
-
-    // Wait until workflow is created and IN_PROGRESS state.
-    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
-
-    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
-    // workflow
-    Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
-    Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
-    Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
-
-    String workflowContextPath =
-        "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
-
-    ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
-        null, AccessOption.PERSISTENT);
-    Assert.assertNotNull(record);
-
-    // Wait until workflow is completed.
-    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
-
-    // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
-    // expired.
-    boolean workflowExpired = TestHelper.verify(() -> {
-      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
-      WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
-      IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
-      return (wCtx == null && wCfg == null && idealState == null);
-    }, TestHelper.WAIT_DURATION);
-    Assert.assertTrue(workflowExpired);
-
-    _controller.syncStop();
-
-    // Write workflow context to ZooKeeper
-    _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
-        AccessOption.PERSISTENT);
-
-    // Verify context is written back to ZK.
-    record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
-        null, AccessOption.PERSISTENT);
-    Assert.assertNotNull(record);
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // Create and start new workflow just to make sure controller is running and new workflow is
-    // scheduled successfully.
-    String workflowName2 = TestHelper.getTestMethodName() + "_2";
-    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
-    _driver.start(builder2.build());
-    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
-
-    // Verify that WorkflowContext will be deleted
-    boolean contextDeleted = TestHelper.verify(() -> {
-      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
-      return (wCtx == null);
-    }, TestHelper.WAIT_DURATION);
-    Assert.assertTrue(contextDeleted);
-  }
-
   private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
     final long expiryTime = 5000L;
     Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
deleted file mode 100644
index 56e756d..0000000
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.integration.task.TaskTestBase;
-import org.apache.helix.integration.task.TaskTestUtil;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTaskUtil extends TaskTestBase {
-
-  @Test
-  public void testGetExpiredJobsFromCache() {
-    String workflowName = "TEST_WORKFLOW";
-    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
-
-    JobConfig.Builder jobBuilder_0 =
-        new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    JobConfig.Builder jobBuilder_1 =
-        new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    JobConfig.Builder jobBuilder_2 =
-        new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    JobConfig.Builder jobBuilder_3 =
-        new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    Workflow jobQueue =
-        queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
-            .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build();
-
-    WorkflowContext workflowContext = mock(WorkflowContext.class);
-    Map<String, TaskState> jobStates = new HashMap<>();
-    jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
-    jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
-    jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
-    jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
-    when(workflowContext.getJobStates()).thenReturn(jobStates);
-
-    JobConfig jobConfig = mock(JobConfig.class);
-    WorkflowControllerDataProvider workflowControllerDataProvider =
-        mock(WorkflowControllerDataProvider.class);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
-        .thenReturn(jobConfig);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
-        .thenReturn(jobConfig);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
-        .thenReturn(jobConfig);
-
-    JobContext jobContext = mock(JobContext.class);
-    when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
-
-    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
-    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
-        .thenReturn(jobContext);
-    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
-        .thenReturn(jobContext);
-
-    Set<String> expectedJobs = new HashSet<>();
-    expectedJobs.add(workflowName + "_Job_0");
-    expectedJobs.add(workflowName + "_Job_3");
-    Assert.assertEquals(TaskUtil
-        .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
-            workflowContext), expectedJobs);
-  }
-}