You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/24 21:33:16 UTC

helix git commit: [HELIX-743] Fix purgeExpiredJobs() so that jobs whose removal has failed do not get removed from DAG

Repository: helix
Updated Branches:
  refs/heads/master 4a121ef26 -> c012c7b9d


[HELIX-743] Fix purgeExpiredJobs() so that jobs whose removal has failed do not get removed from DAG

Previously, even if the job removal had failed, Task Framework would go ahead and remove the job from the DAG. This would cause some ZNodes to be left over and never be cleaned up at next purge time.
Changelist:
1. Keep track of jobs whose removal failed and remove them from expiredJobs so that next call to purgeExpiredJobs(), the job would be included in expiredJobs again and removal would be tried again.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c012c7b9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c012c7b9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c012c7b9

Branch: refs/heads/master
Commit: c012c7b9dea35137935ac29c035a96aea570bf9c
Parents: 4a121ef
Author: Hunter Lee <na...@gmail.com>
Authored: Tue Jul 24 12:20:00 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Tue Jul 24 12:20:00 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/task/WorkflowRebalancer.java  | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c012c7b9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 32a5370..165e61d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -538,18 +538,24 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
       Set<String> expiredJobs = TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(),
           _manager.getHelixPropertyStore(), workflowConfig, workflowContext);
-
       if (expiredJobs.isEmpty()) {
         LOG.info("No job to purge for the queue " + workflow);
       } else {
         LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        Set<String> failedJobRemovals = new HashSet<>();
         for (String job : expiredJobs) {
           if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
               job)) {
+            failedJobRemovals.add(job);
             LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
           }
           _rebalanceScheduler.removeScheduledRebalance(job);
         }
+
+        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+        // removal will be tried again at next purge
+        expiredJobs.removeAll(failedJobRemovals);
+
         if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs,
             true)) {
           LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs