You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/07/17 19:00:41 UTC

[helix] branch revert-1076-nealsun/recover-tf-garbage-collection created (now 7313667)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a change to branch revert-1076-nealsun/recover-tf-garbage-collection
in repository https://gitbox.apache.org/repos/asf/helix.git.


      at 7313667  Revert "Recover Workflow Garbage Collection Logic (#1076)"

This branch includes the following new commits:

     new 7313667  Revert "Recover Workflow Garbage Collection Logic (#1076)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[helix] 01/01: Revert "Recover Workflow Garbage Collection Logic (#1076)"

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch revert-1076-nealsun/recover-tf-garbage-collection
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 73136670815469898b901c5b69b1895666f136d8
Author: Junkai Xue <ju...@gmail.com>
AuthorDate: Fri Jul 17 12:00:31 2020 -0700

    Revert "Recover Workflow Garbage Collection Logic (#1076)"
    
    This reverts commit c7a97bdce66b21dab5522a4f100d08054ada99c2.
---
 .../helix/controller/stages/AttributeName.java     |   6 +-
 .../stages/TaskGarbageCollectionStage.java         |  93 ++---------
 .../main/java/org/apache/helix/task/TaskUtil.java  | 173 +++++++++------------
 .../org/apache/helix/task/WorkflowDispatcher.java  |  14 ++
 .../helix/controller/stages/TestTaskStage.java     |  89 ++---------
 .../task/TestWorkflowContextWithoutConfig.java     |  66 --------
 .../java/org/apache/helix/task/TestTaskUtil.java   |  95 -----------
 7 files changed, 112 insertions(+), 424 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 557bb20..0af0ee5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,9 +40,5 @@ public enum AttributeName {
   PipelineType,
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
-  STATEFUL_REBALANCER,
-  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
-  TO_BE_PURGED_WORKFLOWS,
-  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
-  TO_BE_PURGED_JOBS_MAP
+  STATEFUL_REBALANCER
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index f965a3e..3f8e744 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -1,9 +1,6 @@
 package org.apache.helix.controller.stages;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixManager;
@@ -13,12 +10,9 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
   private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@@ -29,87 +23,34 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
-    // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
-    // This is to avoid race conditions since the cache will be modified. After this work, then the
-    // async work will happen.
+  public void execute(ClusterEvent event) {
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-    if (manager == null) {
+
+    if (dataProvider == null || manager == null) {
       LOG.warn(
-          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
-    Set<String> workflowsToBePurged = new HashSet<>();
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
-    for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
-      WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
+    Set<WorkflowConfig> existingWorkflows =
+        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+    for (WorkflowConfig workflowConfig : existingWorkflows) {
+      // clean up the expired jobs if it is a queue.
       if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
           .isJobQueue())) {
-        WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
-        long purgeInterval = workflowConfig.getJobPurgeInterval();
-        long currentTime = System.currentTimeMillis();
-        if (purgeInterval > 0
-            && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
-          // Find jobs that are ready to be purged
-          Set<String> expiredJobs =
-              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
-          if (!expiredJobs.isEmpty()) {
-            expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
-          }
-          scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime, purgeInterval,
-              _rebalanceScheduler, manager);
+        try {
+          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+              _rebalanceScheduler);
+        } catch (Exception e) {
+          LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
+              workflowConfig.getWorkflowId(), e.toString()));
         }
-      } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
-          .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
-        // Find workflows that need to be purged
-        workflowsToBePurged.add(entry.getKey());
-      }
-    }
-    event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
-        Collections.unmodifiableMap(expiredJobsMap));
-    event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
-        Collections.unmodifiableSet(workflowsToBePurged));
-
-    super.process(event);
-  }
-
-  @Override
-  public void execute(ClusterEvent event) {
-    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-    if (manager == null) {
-      LOG.warn(
-          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
-          event.getEventId(), event.getEventType(), event.getClusterName());
-      return;
-    }
-
-    Map<String, Set<String>> expiredJobsMap =
-        event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
-    Set<String> toBePurgedWorkflows =
-        event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
-
-    for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
-      try {
-        TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager,
-            _rebalanceScheduler);
-      } catch (Exception e) {
-        LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
       }
     }
 
-    TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
-  }
-
-  private static void scheduleNextJobPurge(String workflow, long currentTime, long purgeInterval,
-      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
-    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ec8d216..917a69b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,8 +735,20 @@ public class TaskUtil {
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
-        if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+        if (jobConfig == null) {
+          LOG.error(String.format(
+              "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
+              job, workflowConfig.getWorkflowId()));
+          // Add the job name to expiredJobs so that purge operation will be tried again on this job
           expiredJobs.add(job);
+          continue;
+        }
+        long expiry = jobConfig.getExpiry();
+        if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+          if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+              && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+            expiredJobs.add(job);
+          }
         }
       }
     }
@@ -744,52 +756,6 @@ public class TaskUtil {
   }
 
   /**
-   * Based on a workflow's config or context, create a set of jobs that are either expired, which
-   * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
-   * meaning that the job might have been deleted manually from the a job queue, or is left in the
-   * DAG due to a failed clean-up attempt from last purge. The difference between this function and
-   * getExpiredJobs() is that this function gets JobConfig and JobContext from a
-   * WorkflowControllerDataProvider instead of Zk.
-   * @param workflowControllerDataProvider
-   * @param workflowConfig
-   * @param workflowContext
-   * @return
-   */
-  public static Set<String> getExpiredJobsFromCache(
-      WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
-    Set<String> expiredJobs = new HashSet<>();
-    Map<String, TaskState> jobStates = workflowContext.getJobStates();
-    for (String job : workflowConfig.getJobDag().getAllNodes()) {
-      JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
-      JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
-      if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
-        expiredJobs.add(job);
-      }
-    }
-    return expiredJobs;
-  }
-
-  /*
-   * Checks if a job is expired and should be purged. This includes a special case when jobConfig
-   * is null. That happens when a job might have been deleted manually from the a job queue, or is
-   * left in the DAG due to a failed clean-up attempt from last purge.
-   */
-  private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
-      TaskState jobState) {
-    if (jobConfig == null) {
-      LOG.warn(
-          "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
-          jobName);
-      return true;
-    }
-    long expiry = jobConfig.getExpiry();
-    return jobContext != null && jobState == TaskState.COMPLETED
-        && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
-        && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
-  }
-
-  /**
    * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
    * @param accessor
    * @param propertyStore
@@ -1011,71 +977,72 @@ public class TaskUtil {
   }
 
   /**
-   * Clean up all jobs that are marked as expired.
+   * Clean up all jobs that are COMPLETED and passes its expiry time.
+   * @param workflowConfig
+   * @param workflowContext
    */
-  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
-      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
-    Set<String> failedJobRemovals = new HashSet<>();
-    for (String job : expiredJobs) {
-      if (!TaskUtil
-          .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
-        failedJobRemovals.add(job);
-        LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
-      }
-      rebalanceScheduler.removeScheduledRebalance(job);
+  public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext, HelixManager manager,
+      RebalanceScheduler rebalanceScheduler) {
+    if (workflowContext == null) {
+      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
+      return;
     }
+    long purgeInterval = workflowConfig.getJobPurgeInterval();
+    long currentTime = System.currentTimeMillis();
+    final Set<String> expiredJobs = Sets.newHashSet();
+    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+      expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
+          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
+      if (expiredJobs.isEmpty()) {
+        LOG.info("No job to purge for the queue " + workflow);
+      } else {
+        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        Set<String> failedJobRemovals = new HashSet<>();
+        for (String job : expiredJobs) {
+          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+              job)) {
+            failedJobRemovals.add(job);
+            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+          }
+          rebalanceScheduler.removeScheduledRebalance(job);
+        }
 
-    // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
-    // removal will be tried again at next purge
-    expiredJobs.removeAll(failedJobRemovals);
+        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+        // removal will be tried again at next purge
+        expiredJobs.removeAll(failedJobRemovals);
 
-    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
-      LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
-          workflow);
-    }
+        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
+            true)) {
+          LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
+              + " from the workflow " + workflow);
+        }
 
-    if (expiredJobs.size() > 0) {
-      // Update workflow context will be in main pipeline not here. Otherwise, it will cause
-      // concurrent write issue. It is possible that jobs got purged but there is no event to
-      // trigger the pipeline to clean context.
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      List<String> resourceConfigs =
-          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-      if (resourceConfigs.size() > 0) {
-        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
-      } else {
-        LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
-            expiredJobs);
+        if (expiredJobs.size() > 0) {
+          // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+          // concurrent write issue. It is possible that jobs got purged but there is no event to
+          // trigger the pipeline to clean context.
+          HelixDataAccessor accessor = manager.getHelixDataAccessor();
+          List<String> resourceConfigs =
+              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+          if (resourceConfigs.size() > 0) {
+            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+          } else {
+            LOG.warn(
+                "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
+          }
+        }
       }
     }
+    setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
   }
 
-  /**
-   * The function that removes IdealStates and workflow contexts of the workflows that need to be
-   * deleted.
-   * @param toBePurgedWorkflows
-   * @param manager
-   */
-  public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
-      final HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
-
-    for (String workflowName : toBePurgedWorkflows) {
-      LOG.warn(
-          "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
-          workflowName);
-
-      // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
-      if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
-        LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
-            workflowName);
-        continue;
-      }
-
-      if (!removeWorkflowContext(propertyStore, workflowName)) {
-        LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
-      }
+  private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
+      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+    long nextPurgeTime = currentTime + purgeInterval;
+    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 8c49e1e..89e6d76 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -356,6 +356,20 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
         TaskConstants.STATE_MODEL_NAME);
 
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    // Set the job configuration
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      }
+    }
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+
     // Push out new ideal state based on number of target partitions
     IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
     builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index db27355..fefc737 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -1,16 +1,8 @@
 package org.apache.helix.controller.stages;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.helix.AccessOption;
-import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.common.DedupEventProcessor;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
-import org.apache.helix.controller.pipeline.AsyncWorkerType;
-import org.apache.helix.task.TaskUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -74,7 +66,7 @@ public class TestTaskStage extends TaskTestBase {
         TaskConstants.STATE_MODEL_NAME);
 
     // Create the context
-    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
     wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
     wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
     wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -133,34 +125,15 @@ public class TestTaskStage extends TaskTestBase {
    * async job purge will try to delete it again.
    */
   @Test(dependsOnMethods = "testPersistContextData")
-  public void testPartialDataPurge() throws Exception {
-    DedupEventProcessor<String, Runnable> worker =
-        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
-            AsyncWorkerType.TaskJobPurgeWorker.name()) {
-          @Override
-          protected void handleEvent(Runnable event) {
-            event.run();
-          }
-        };
-    worker.start();
-    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
-    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
-    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
-
+  public void testPartialDataPurge() {
     // Manually delete JobConfig
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
     deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
 
-    // Manually refresh because there's no controller notify data change
-    BaseControllerDataProvider dataProvider =
-        _event.getAttribute(AttributeName.ControllerDataProvider.name());
-    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
-    dataProvider.refresh(_manager.getHelixDataAccessor());
-
     // Then purge jobs
     TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
-    garbageCollectionStage.process(_event);
+    garbageCollectionStage.execute(_event);
 
     // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
     // IdealState check
@@ -169,41 +142,6 @@ public class TestTaskStage extends TaskTestBase {
     checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
   }
 
-  @Test(dependsOnMethods = "testPartialDataPurge")
-  public void testWorkflowGarbageCollection() throws Exception {
-    DedupEventProcessor<String, Runnable> worker =
-        new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
-            AsyncWorkerType.TaskJobPurgeWorker.name()) {
-          @Override
-          protected void handleEvent(Runnable event) {
-            event.run();
-          }
-        };
-    worker.start();
-    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
-    workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
-    _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
-
-    String zkPath =
-        _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
-    _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
-
-    // Manually refresh because there's no controller notify data change
-    BaseControllerDataProvider dataProvider =
-        _event.getAttribute(AttributeName.ControllerDataProvider.name());
-    dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
-    dataProvider.refresh(_manager.getHelixDataAccessor());
-
-    // Then garbage collect workflow
-    TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
-    garbageCollectionStage.process(_event);
-
-    // Check that IS and contexts have been purged for the workflow
-    checkForIdealStateAndContextRemoval(_testWorkflow);
-
-    worker.shutdown();
-  }
-
   private void deleteJobConfigs(String workflowName, String jobName) {
     String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
     String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -212,23 +150,16 @@ public class TestTaskStage extends TaskTestBase {
     _baseAccessor.remove(newPath, AccessOption.PERSISTENT);
   }
 
-  private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
+  private void checkForIdealStateAndContextRemoval(String workflow, String job) {
+    // IdealState
+    Assert.assertFalse(
+        _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
+
     // JobContexts in old ZNode path
     String oldPath =
         String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
     String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
-
-    Assert.assertTrue(TestHelper.verify(
-        () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
-            && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
-            .exists(newPath, AccessOption.PERSISTENT), 120000));
-  }
-
-  private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
-    Assert.assertTrue(TestHelper.verify(() ->
-            !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
-                && !_baseAccessor
-                .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
-        120000));
+    Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 84df546..6c29f3a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,72 +103,6 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
     Assert.assertTrue(workflowContextNotCreated);
   }
 
-  @Test
-  public void testWorkflowContextGarbageCollection() throws Exception {
-    String workflowName = TestHelper.getTestMethodName();
-    Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
-    _driver.start(builder1.build());
-
-    // Wait until workflow is created and IN_PROGRESS state.
-    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
-
-    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
-    // workflow
-    Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
-    Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
-    Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
-
-    String workflowContextPath =
-        "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
-
-    ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
-        null, AccessOption.PERSISTENT);
-    Assert.assertNotNull(record);
-
-    // Wait until workflow is completed.
-    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
-
-    // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
-    // expired.
-    boolean workflowExpired = TestHelper.verify(() -> {
-      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
-      WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
-      IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
-      return (wCtx == null && wCfg == null && idealState == null);
-    }, TestHelper.WAIT_DURATION);
-    Assert.assertTrue(workflowExpired);
-
-    _controller.syncStop();
-
-    // Write workflow context to ZooKeeper
-    _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
-        AccessOption.PERSISTENT);
-
-    // Verify context is written back to ZK.
-    record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
-        null, AccessOption.PERSISTENT);
-    Assert.assertNotNull(record);
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // Create and start new workflow just to make sure controller is running and new workflow is
-    // scheduled successfully.
-    String workflowName2 = TestHelper.getTestMethodName() + "_2";
-    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
-    _driver.start(builder2.build());
-    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
-
-    // Verify that WorkflowContext will be deleted
-    boolean contextDeleted = TestHelper.verify(() -> {
-      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
-      return (wCtx == null);
-    }, TestHelper.WAIT_DURATION);
-    Assert.assertTrue(contextDeleted);
-  }
-
   private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
     final long expiryTime = 5000L;
     Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
deleted file mode 100644
index 56e756d..0000000
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.integration.task.TaskTestBase;
-import org.apache.helix.integration.task.TaskTestUtil;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTaskUtil extends TaskTestBase {
-
-  @Test
-  public void testGetExpiredJobsFromCache() {
-    String workflowName = "TEST_WORKFLOW";
-    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
-
-    JobConfig.Builder jobBuilder_0 =
-        new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    JobConfig.Builder jobBuilder_1 =
-        new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    JobConfig.Builder jobBuilder_2 =
-        new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    JobConfig.Builder jobBuilder_3 =
-        new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
-            .setExpiry(1L);
-    Workflow jobQueue =
-        queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
-            .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build();
-
-    WorkflowContext workflowContext = mock(WorkflowContext.class);
-    Map<String, TaskState> jobStates = new HashMap<>();
-    jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
-    jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
-    jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
-    jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
-    when(workflowContext.getJobStates()).thenReturn(jobStates);
-
-    JobConfig jobConfig = mock(JobConfig.class);
-    WorkflowControllerDataProvider workflowControllerDataProvider =
-        mock(WorkflowControllerDataProvider.class);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
-        .thenReturn(jobConfig);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
-        .thenReturn(jobConfig);
-    when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
-        .thenReturn(jobConfig);
-
-    JobContext jobContext = mock(JobContext.class);
-    when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
-
-    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
-    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
-        .thenReturn(jobContext);
-    when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
-        .thenReturn(jobContext);
-
-    Set<String> expectedJobs = new HashSet<>();
-    expectedJobs.add(workflowName + "_Job_0");
-    expectedJobs.add(workflowName + "_Job_3");
-    Assert.assertEquals(TaskUtil
-        .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
-            workflowContext), expectedJobs);
-  }
-}