You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/09/17 22:51:33 UTC

[1/4] helix git commit: Selective update for Resource (include Workflow and Job) Config read.

Repository: helix
Updated Branches:
  refs/heads/master 74145e8ad -> 1ad490ec7


Selective update for Resource (include Workflow and Job) Config read.

For task framework 2.0, we need to try to minimize the impact of data loading from cache. One of the target is trying to reduce the latency of read/write for task fraemwork. In this rb, read operation will be minimized for resource config. For workflow and job context read, it will be writing only operation and changes are in another commit.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fc868b34
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fc868b34
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fc868b34

Branch: refs/heads/master
Commit: fc868b34d60eaba82ad3f9e60004e8cb5db1c9f3
Parents: 74145e8
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Jul 30 10:14:33 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:29:52 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 48 ++++++++++++++++++--
 .../TestClusterDataCacheSelectiveUpdate.java    | 18 ++++++++
 .../apache/helix/tools/TestHelixAdminCli.java   | 18 ++++----
 3 files changed, 71 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 577b2c7..f960603 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,11 +19,13 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,6 +35,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.IdealStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
@@ -68,7 +71,7 @@ import static org.apache.helix.HelixConstants.ChangeType;
  * Reads the data from the cluster using data accessor. This output ClusterData which
  * provides useful methods to search/lookup properties
  */
-public class ClusterDataCache {
+public class ClusterDataCache extends AbstractDataCache {
   private static final Logger LOG = LoggerFactory.getLogger(ClusterDataCache.class.getName());
 
   private ClusterConfig _clusterConfig;
@@ -78,7 +81,7 @@ public class ClusterDataCache {
   private Map<String, InstanceConfig> _instanceConfigMap;
   private Map<String, InstanceConfig> _instanceConfigCacheMap;
   private Map<String, Long> _instanceOfflineTimeMap;
-  private Map<String, ResourceConfig> _resourceConfigMap;
+  private Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
   private Map<String, ResourceConfig> _resourceConfigCacheMap;
   private Map<String, ClusterConstraints> _constraintMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
@@ -182,8 +185,8 @@ public class ClusterDataCache {
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, false);
       clearCachedResourceAssignments();
-      _resourceConfigCacheMap =
-          accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true);
+
+      _resourceConfigCacheMap = refreshResourceConfigs(accessor);
       LogUtil.logInfo(LOG, _eventId, "Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet()
           + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
@@ -942,6 +945,43 @@ public class ClusterDataCache {
     return change;
   }
 
+  private Map<String, ResourceConfig> refreshResourceConfigs(HelixDataAccessor accessor) {
+    Map<String, ResourceConfig> refreshedResourceConfigs = Maps.newHashMap();
+
+    long startTime = System.currentTimeMillis();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Set<PropertyKey> currentResourceConfigKeys = new HashSet<>();
+    for (String resourceConfig : accessor.getChildNames(keyBuilder.resourceConfigs())) {
+      currentResourceConfigKeys.add(keyBuilder.resourceConfig(resourceConfig));
+    }
+
+    Set<PropertyKey> cachedKeys = new HashSet<>();
+    Map<PropertyKey, ResourceConfig> cachedResourceConfigMap = Maps.newHashMap();
+
+    for (String resourceConfig : _resourceConfigMap.keySet()) {
+      cachedKeys.add(keyBuilder.resourceConfig(resourceConfig));
+      cachedResourceConfigMap
+          .put(keyBuilder.resourceConfig(resourceConfig), _resourceConfigMap.get(resourceConfig));
+    }
+    cachedKeys.retainAll(currentResourceConfigKeys);
+
+    Set<PropertyKey> reloadKeys = new HashSet<>(currentResourceConfigKeys);
+    reloadKeys.removeAll(cachedKeys);
+
+    Map<PropertyKey, ResourceConfig> updatedMap =
+        refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys),
+            cachedResourceConfigMap);
+    for (ResourceConfig resourceConfig : updatedMap.values()) {
+      refreshedResourceConfigs.put(resourceConfig.getResourceName(), resourceConfig);
+    }
+
+    long endTime = System.currentTimeMillis();
+    LogUtil.logInfo(LOG, getEventId(),
+        "Refresh " + refreshedResourceConfigs.size() + " resource configs for cluster "
+            + _clusterName + ", took " + (endTime - startTime) + " ms");
+    return refreshedResourceConfigs;
+  }
+
   /**
    * toString method to print the entire cluster state
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
index a749265..11044b7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
@@ -25,8 +25,12 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.MockZkHelixDataAccessor;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -124,5 +128,19 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase
     cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2);
+
+    // Test WorkflowConfig/JobConfigs
+    TaskDriver driver = new TaskDriver(_manager);
+    Workflow.Builder workflow = WorkflowGenerator.generateSingleJobWorkflowBuilder("Job",
+        new JobConfig.Builder().setCommand("ReIndex").setTargetResource("TestDB_2"));
+    driver.start(workflow.build());
+
+    Thread.sleep(100);
+    accessor.clearReadCounters();
+
+    cache.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+    cache.refresh(accessor);
+    // 1 Cluster Config change + 2 Resource Config Changes
+    Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 3);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index fdab158..27d6dc5 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -105,7 +105,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddCluster")
   public void testAddResource() throws Exception {
     String command = "-zkSvr localhost:2183 -addCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -130,7 +130,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddResource")
   public void testAddInstance() throws Exception {
     String command = "-zkSvr localhost:2183 -addCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -180,7 +180,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddInstance")
   public void testRebalanceResource() throws Exception {
     String command = "-zkSvr localhost:2183 -addCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -211,7 +211,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testRebalanceResource")
   public void testStartCluster() throws Exception {
     final int n = 6;
 
@@ -294,7 +294,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testStartCluster")
   public void testDropAddResource() throws Exception {
     final int n = 6;
 
@@ -398,7 +398,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     Thread.sleep(100);
   }
 
-  @Test
+  @Test (dependsOnMethods = "testDropAddResource")
   public void testInstanceOperations() throws Exception {
     final int n = 6;
 
@@ -469,7 +469,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testInstanceOperations")
   public void testExpandCluster() throws Exception {
     final int n = 6;
 
@@ -520,7 +520,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testExpandCluster")
   public void testDeactivateCluster() throws Exception {
     final int n = 6;
 
@@ -577,7 +577,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testDeactivateCluster")
   public void testInstanceGroupTags() throws Exception {
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
     HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);


[3/4] helix git commit: Replace BestPossibleStage with TaskSchedulingStage

Posted by lx...@apache.org.
Replace BestPossibleStage with TaskSchedulingStage

For new task framework, we dont have to loop through all the idealstates. For the first step, we can have a simple version of BestPossibleStage.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/50c9aa3a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/50c9aa3a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/50c9aa3a

Branch: refs/heads/master
Commit: 50c9aa3a2eaf82e75743c693b7e40ad741e51198
Parents: c45d3a6
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Jul 17 17:56:36 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:45:45 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   2 +-
 .../stages/BestPossibleStateCalcStage.java      |  57 +-----
 .../controller/stages/TaskSchedulingStage.java  | 204 +++++++++++++++++++
 .../task/TestWorkflowTermination.java           |  26 +--
 4 files changed, 227 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9f94755..48677d3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -306,7 +306,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       // TODO: Junkai will work on refactoring existing pipeline log into abstract logic and
       // extend the logic to separate pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
-      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new TaskSchedulingStage());
       rebalancePipeline.addStage(new IntermediateStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index a6ba5b8..5cc9593 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -103,35 +103,25 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     BestPossibleStateOutput output = new BestPossibleStateOutput();
 
-    PriorityQueue<ResourcePriority> resourcePriorityQueue = new PriorityQueue<>();
-    TaskDriver taskDriver = null;
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
-    if (helixManager != null) {
-      taskDriver = new TaskDriver(helixManager);
-    }
-    for (Resource resource : resourceMap.values()) {
-      resourcePriorityQueue.add(new ResourcePriority(resource, cache.getIdealState(resource.getResourceName()),
-          taskDriver));
-    }
 
     final List<String> failureResources = new ArrayList<>();
-    Iterator<ResourcePriority> itr = resourcePriorityQueue.iterator();
+    Iterator<Resource> itr = resourceMap.values().iterator();
     while (itr.hasNext()) {
-      Resource resource = itr.next().getResource();
+      Resource resource = itr.next();
       if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) {
         failureResources.add(resource.getResourceName());
-        LogUtil.logWarn(logger, _eventId, "Failed to calculate best possible states for " + resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId,
+            "Failed to calculate best possible states for " + resource.getResourceName());
       }
     }
 
     // Check and report if resource rebalance has failure
-    if (!cache.isTaskCache()) {
-      ClusterStatusMonitor clusterStatusMonitor =
-          event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor,
-          "Failed to calculate best possible states for " + failureResources.size()
-              + " resources.");
-    }
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor,
+        "Failed to calculate best possible states for " + failureResources.size() + " resources.");
+
     return output;
   }
 
@@ -356,33 +346,4 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     return mappingCalculator;
   }
-
-  class ResourcePriority implements Comparable<ResourcePriority> {
-    final Resource _resource;
-    // By default, non-job resources and new jobs are assigned lowest priority
-    Long _priority = Long.MAX_VALUE;
-
-    Resource getResource() {
-      return _resource;
-    }
-
-    public ResourcePriority(Resource resource, IdealState idealState, TaskDriver taskDriver) {
-      _resource = resource;
-
-      if (taskDriver != null && idealState != null
-          && idealState.getRebalancerClassName() != null
-          && idealState.getRebalancerClassName().equals(JobRebalancer.class.getName())) {
-        // Update priority for job resources, note that older jobs will be processed earlier
-        JobContext jobContext = taskDriver.getJobContext(resource.getResourceName());
-        if (jobContext != null && jobContext.getStartTime() != WorkflowContext.UNSTARTED) {
-          _priority = jobContext.getStartTime();
-        }
-      }
-    }
-
-    @Override
-    public int compareTo(ResourcePriority otherJob) {
-      return _priority.compareTo(otherJob._priority);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
new file mode 100644
index 0000000..e2596b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
@@ -0,0 +1,204 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobRebalancer;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskRebalancer;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskSchedulingStage extends AbstractBaseStage {
+  private static final Logger logger = LoggerFactory.getLogger(TaskSchedulingStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
+    final Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+
+    if (currentStateOutput == null || resourceMap == null || cache == null) {
+      throw new StageException(
+          "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+    }
+
+    // Reset current INIT/RUNNING tasks on participants for throttling
+    cache.resetActiveTaskCount(currentStateOutput);
+
+    final BestPossibleStateOutput bestPossibleStateOutput =
+        compute(event, resourceMap, currentStateOutput);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+
+  }
+
+  private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
+      CurrentStateOutput currentStateOutput) {
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+    PriorityQueue<TaskSchedulingStage.ResourcePriority> resourcePriorityQueue =
+        new PriorityQueue<>();
+    TaskDriver taskDriver = null;
+    HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
+    if (helixManager != null) {
+      taskDriver = new TaskDriver(helixManager);
+    }
+    for (Resource resource : resourceMap.values()) {
+      resourcePriorityQueue.add(new TaskSchedulingStage.ResourcePriority(resource,
+          cache.getIdealState(resource.getResourceName()), taskDriver));
+    }
+
+    // TODO: Replace this looping available resources with Workflow Queues
+    for (Iterator<TaskSchedulingStage.ResourcePriority> itr = resourcePriorityQueue.iterator();
+        itr.hasNext(); ) {
+      Resource resource = itr.next().getResource();
+      if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) {
+        LogUtil
+            .logWarn(logger, _eventId, "Failed to assign tasks for " + resource.getResourceName());
+      }
+    }
+
+    return output;
+  }
+
+
+  private boolean computeResourceBestPossibleState(ClusterEvent event, ClusterDataCache cache,
+      CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) {
+    // for each ideal state
+    // read the state model def
+    // for each resource
+    // get the preference list
+    // for each instanceName check if its alive then assign a state
+
+    String resourceName = resource.getResourceName();
+    LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName);
+    // Ideal state may be gone. In that case we need to get the state model name
+    // from the current state
+    IdealState idealState = cache.getIdealState(resourceName);
+    if (idealState == null) {
+      // if ideal state is deleted, use an empty one
+      LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does not exist anymore");
+      idealState = new IdealState(resourceName);
+      idealState.setStateModelDefRef(resource.getStateModelDefRef());
+    }
+
+    Rebalancer rebalancer = null;
+    String rebalancerClassName = idealState.getRebalancerClassName();
+    if (rebalancerClassName != null) {
+      if (logger.isDebugEnabled()) {
+        LogUtil.logDebug(logger, _eventId,
+            "resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
+      }
+      try {
+        rebalancer = Rebalancer.class
+            .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+      } catch (Exception e) {
+        LogUtil.logError(logger, _eventId,
+            "Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+      }
+    }
+
+    MappingCalculator mappingCalculator = null;
+    if (rebalancer != null) {
+      try {
+        mappingCalculator = MappingCalculator.class.cast(rebalancer);
+      } catch (ClassCastException e) {
+        LogUtil.logWarn(logger, _eventId,
+            "Rebalancer does not have a mapping calculator, defaulting to SEMI_AUTO, resource: "
+                + resourceName);
+      }
+    } else {
+      return false;
+    }
+
+    TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
+    taskRebalancer.setClusterStatusMonitor(
+        (ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()));
+
+    ResourceAssignment partitionStateAssignment = null;
+    try {
+      HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+      rebalancer.init(manager);
+
+      // Use the internal MappingCalculator interface to compute the final assignment
+      // The next release will support rebalancers that compute the mapping from start to finish
+        partitionStateAssignment = mappingCalculator
+            .computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
+        for (Partition partition : resource.getPartitions()) {
+          Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+          output.setState(resourceName, partition, newStateMap);
+        }
+
+        // Check if calculation is done successfully
+        return true;
+      } catch (Exception e) {
+        LogUtil
+            .logError(logger, _eventId, "Error computing assignment for resource " + resourceName + ". Skipping.", e);
+        // TODO : remove this part after debugging NPE
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(String
+            .format("HelixManager is null : %s\n", event.getAttribute("helixmanager") == null));
+        sb.append(String.format("Rebalancer is null : %s\n", rebalancer == null));
+        sb.append(String.format("Calculated idealState is null : %s\n", idealState == null));
+        sb.append(String.format("MappingCaculator is null : %s\n", mappingCalculator == null));
+        sb.append(
+            String.format("PartitionAssignment is null : %s\n", partitionStateAssignment == null));
+        sb.append(String.format("Output is null : %s\n", output == null));
+
+        LogUtil.logError(logger, _eventId, sb.toString());
+      }
+
+    // Exception or rebalancer is not found
+    return false;
+  }
+
+  class ResourcePriority implements Comparable<ResourcePriority> {
+    final Resource _resource;
+    // By default, non-job resources and new jobs are assigned lowest priority
+    Long _priority = Long.MAX_VALUE;
+
+    Resource getResource() {
+      return _resource;
+    }
+
+    public ResourcePriority(Resource resource, IdealState idealState, TaskDriver taskDriver) {
+      _resource = resource;
+
+      if (taskDriver != null && idealState != null
+          && idealState.getRebalancerClassName() != null
+          && idealState.getRebalancerClassName().equals(JobRebalancer.class.getName())) {
+        // Update priority for job resources, note that older jobs will be processed earlier
+        JobContext jobContext = taskDriver.getJobContext(resource.getResourceName());
+        if (jobContext != null && jobContext.getStartTime() != WorkflowContext.UNSTARTED) {
+          _priority = jobContext.getStartTime();
+        }
+      }
+    }
+
+    @Override
+    public int compareTo(ResourcePriority otherJob) {
+      return _priority.compareTo(otherJob._priority);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index f303c52..1a08468 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -130,10 +130,10 @@ public class TestWorkflowTermination extends TaskTestBase {
   public void testWorkflowPausedTimeout() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
     long workflowExpiry = 2000; // 2sec expiry time
-    long timeout = 2000;
+    long timeout = 5000;
     String notStartedJobName = JOB_NAME + "-NotStarted";
 
-    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 100);
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 5000);
     jobBuilder.setWorkflow(workflowName);
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
         .setWorkflowConfig(
@@ -151,7 +151,7 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     // Wait a bit for the job to get scheduled. Job runs for 100ms so this will very likely
     // to trigger a job stopped
-    Thread.sleep(40);
+    Thread.sleep(100);
 
     // Pause the queue
     _driver.waitToStop(workflowName, 10000L);
@@ -208,17 +208,17 @@ public class TestWorkflowTermination extends TaskTestBase {
     String job2 = JOB_NAME + "2";
     String job3 = JOB_NAME + "3";
     String job4 = JOB_NAME + "4";
-    long workflowExpiry = 2000;
-    long timeout = 5000;
+    long workflowExpiry = 10000;
+    long timeout = 8000;
 
-    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50);
-    JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 10);
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 1);
+    JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 1);
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
         .setWorkflowConfig(
             new WorkflowConfig.Builder(workflowName)
                 .setWorkFlowType(WORKFLOW_TYPE)
-                .setTimeout(timeout)
+                .setTimeout(timeout).setParallelJobs(4)
                 .setFailureThreshold(1)
                 .build()
         )
@@ -234,26 +234,26 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     _driver.start(workflowBuilder.build());
 
-    _driver.pollForWorkflowState(workflowName, 5000L, TaskState.FAILED);
+    _driver.pollForWorkflowState(workflowName, 10000L, TaskState.FAILED);
 
     // Timeout is longer than fail time, so the failover should occur earlier
     WorkflowContext context = _driver.getWorkflowContext(workflowName);
     Assert.assertTrue(context.getFinishTime() - context.getStartTime() < timeout);
 
     // job1 will complete
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job1), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job1), 10000L,
         TaskState.COMPLETED);
 
     // Possible race between 2 and 3 so it's likely for job2 to stay in either COMPLETED or ABORTED
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job2), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job2), 10000L,
         TaskState.COMPLETED, TaskState.ABORTED);
 
     // job3 meant to fail
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job3), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job3), 10000L,
         TaskState.FAILED);
 
     // because job4 has dependency over job3, it will fail as well
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job4), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job4), 10000L,
         TaskState.FAILED);
 
     // Check MBean is updated


[2/4] helix git commit: For performance improvement, we need to differentiate the task pipeline and regular pipeline.

Posted by lx...@apache.org.
For performance improvement, we need to differentiate the task pipeline and regular pipeline.

1. Split the task pipeline out from regular pipeline.
2. Remove unnecessary stages for Task pipeline which are independent from previous outputs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c45d3a66
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c45d3a66
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c45d3a66

Branch: refs/heads/master
Commit: c45d3a66f55dcddb2b95a1872e76f3030572d2c7
Parents: fc868b3
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Jun 7 17:37:54 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:45:04 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 56 ++++++++++++++++++--
 1 file changed, 53 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c45d3a66/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8d1e44b..9f94755 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -150,12 +150,12 @@ public class GenericHelixController implements IdealStateChangeListener,
    */
   public GenericHelixController() {
     this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createDefaultRegistry(PipelineTypes.TASK.name()));
+        createTaskRegistry(PipelineTypes.TASK.name()));
   }
 
   public GenericHelixController(String clusterName) {
     this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createDefaultRegistry(PipelineTypes.TASK.name()), clusterName);
+        createTaskRegistry(PipelineTypes.TASK.name()), clusterName);
   }
 
   class RebalanceTask extends TimerTask {
@@ -281,13 +281,63 @@ public class GenericHelixController implements IdealStateChangeListener,
       registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
       registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.ExternalViewChange, dataRefresh);
       registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
       return registry;
     }
   }
 
+  private static PipelineRegistry createTaskRegistry(String pipelineName) {
+    logger.info("createDefaultRegistry");
+    synchronized (GenericHelixController.class) {
+      PipelineRegistry registry = new PipelineRegistry();
+
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline(pipelineName);
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // data pre-process pipeline
+      Pipeline dataPreprocess = new Pipeline(pipelineName);
+      dataPreprocess.addStage(new ResourceComputationStage());
+      dataPreprocess.addStage(new ResourceValidationStage());
+      dataPreprocess.addStage(new CurrentStateComputationStage());
+
+      // rebalance pipeline
+      // TODO: Junkai will work on refactoring existing pipeline log into abstract logic and
+      // extend the logic to separate pipeline
+      Pipeline rebalancePipeline = new Pipeline(pipelineName);
+      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new IntermediateStateCalcStage());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageSelectionStage());
+      rebalancePipeline.addStage(new MessageThrottleStage());
+      rebalancePipeline.addStage(new TaskAssignmentStage());
+
+      // backward compatibility check
+      Pipeline liveInstancePipeline = new Pipeline(pipelineName);
+      liveInstancePipeline.addStage(new CompatibilityCheckStage());
+
+      registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline,
+          dataPreprocess, rebalancePipeline);
+      registry
+          .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      return registry;
+    }
+  }
+
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry) {
     this(registry, taskRegistry, null);
   }


[4/4] helix git commit: Async purge job for task framework

Posted by lx...@apache.org.
Async purge job for task framework

There are some bottlenecks identified from previous profiling: https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Task+Framework+Performance+Profiling
This is the reason we need to rearchitect the task framework for Helix. For task framework performance improvement, we need to make purge job functionality asynchronized from the existing pipeline, which originally generates a heavy delay for pipeline execution.

This rb contains the change for moving purge jobs to be async executed. At same time, it brings the impact that job purge time may get a little bit delay because the asyc task submission.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1ad490ec
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1ad490ec
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1ad490ec

Branch: refs/heads/master
Commit: 1ad490ec72a4732417aa84925f39afc6ce43092e
Parents: 50c9aa3
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Jul 31 18:04:35 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:47:42 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  1 +
 .../controller/pipeline/AsyncWorkerType.java    |  3 +-
 .../rebalancer/util/RebalanceScheduler.java     | 19 +++++
 .../stages/TaskGarbageCollectionStage.java      | 33 ++++++++
 .../java/org/apache/helix/task/TaskUtil.java    | 83 +++++++++++++++++++-
 .../apache/helix/task/WorkflowRebalancer.java   | 67 ++--------------
 .../helix/integration/task/TaskTestUtil.java    | 21 +++++
 .../apache/helix/task/TestCleanExpiredJobs.java |  5 ++
 8 files changed, 171 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 48677d3..aae425f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -312,6 +312,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       rebalancePipeline.addStage(new MessageSelectionStage());
       rebalancePipeline.addStage(new MessageThrottleStage());
       rebalancePipeline.addStage(new TaskAssignmentStage());
+      rebalancePipeline.addStage(new TaskGarbageCollectionStage());
 
       // backward compatibility check
       Pipeline liveInstancePipeline = new Pipeline(pipelineName);

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index 443db31..f8d9967 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -29,5 +29,6 @@ package org.apache.helix.controller.pipeline;
 public enum AsyncWorkerType {
   TargetExternalViewCalcWorker,
   PersistAssignmentWorker,
-  ExternalViewComputeWorker
+  ExternalViewComputeWorker,
+  TaskJobPurgeWorker
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index 3fab8c4..ef2dc8d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -5,6 +5,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.IdealState;
 
+import org.apache.helix.model.ResourceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,4 +145,22 @@ public class RebalanceScheduler {
       LOG.warn("Can't find ideal state for {}", resource);
     }
   }
+
+  /**
+   * Trigger the controller to perform rebalance for a given resource.
+   * @param accessor Helix data accessor
+   * @param resource the name of the resource changed to triggering the execution
+   */
+  public static void invokeRebalanceForResourceConfig(HelixDataAccessor accessor, String resource) {
+    LOG.info("invoke rebalance for " + resource);
+    PropertyKey key = accessor.keyBuilder().resourceConfig(resource);
+    ResourceConfig cfg = accessor.getProperty(key);
+    if (cfg != null) {
+      if (!accessor.updateProperty(key, cfg)) {
+        LOG.warn("Failed to invoke rebalance on resource config {}", resource);
+      }
+    } else {
+      LOG.warn("Can't find resource config for {}", resource);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
new file mode 100644
index 0000000..e533c68
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -0,0 +1,33 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
+  private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
+
+  @Override
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.TaskJobPurgeWorker;
+  }
+
+  @Override
+  public void execute(ClusterEvent event) throws Exception {
+    ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    for (WorkflowConfig workflowConfig : clusterDataCache.getWorkflowConfigMap().values()) {
+      // clean up the expired jobs if it is a queue.
+      if (!workflowConfig.isTerminable() || workflowConfig.isJobQueue()) {
+        TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+            clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+            _rebalanceScheduler);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 496b351..ded3aa2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -35,6 +36,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -648,6 +650,10 @@ public class TaskUtil {
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
+        if (jobConfig == null) {
+          LOG.error(String.format("Job %s exists in JobDAG but JobConfig is missing!", job));
+          continue;
+        }
         long expiry = jobConfig.getExpiry();
         if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
           expiry = workflowConfig.getExpiry();
@@ -867,4 +873,79 @@ public class TaskUtil {
     TaskState jobState = workflowContext.getJobState(job);
     return (jobState != null && jobState != TaskState.NOT_STARTED);
   }
-}
\ No newline at end of file
+
+
+  /**
+   * Clean up all jobs that are COMPLETED and passes its expiry time.
+   * @param workflowConfig
+   * @param workflowContext
+   */
+
+  public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext, HelixManager manager,
+      RebalanceScheduler rebalanceScheduler) {
+    if (workflowContext == null) {
+      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
+      return;
+    }
+    long purgeInterval = workflowConfig.getJobPurgeInterval();
+    long currentTime = System.currentTimeMillis();
+    final Set<String> expiredJobs = Sets.newHashSet();
+    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+      expiredJobs.addAll(TaskUtil
+          .getExpiredJobs(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+              workflowConfig, workflowContext));
+      if (expiredJobs.isEmpty()) {
+        LOG.info("No job to purge for the queue " + workflow);
+      } else {
+        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        Set<String> failedJobRemovals = new HashSet<>();
+        for (String job : expiredJobs) {
+          if (!TaskUtil
+              .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
+            failedJobRemovals.add(job);
+            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+          }
+          rebalanceScheduler.removeScheduledRebalance(job);
+        }
+
+        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+        // removal will be tried again at next purge
+        expiredJobs.removeAll(failedJobRemovals);
+
+        if (!TaskUtil
+            .removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+          LOG.warn(
+              "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow "
+                  + workflow);
+        }
+
+        if (expiredJobs.size() > 0) {
+          // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+          // concurrent write issue. It is possible that jobs got purged but there is no event to
+          // trigger the pipeline to clean context.
+          HelixDataAccessor accessor = manager.getHelixDataAccessor();
+          List<String> resourceConfigs =
+              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+          if (resourceConfigs.size() > 0) {
+            RebalanceScheduler.invokeRebalanceForResourceConfig(manager.getHelixDataAccessor(),
+                resourceConfigs.get(0));
+          } else {
+            LOG.warn(
+                "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
+          }
+        }
+      }
+    }
+    setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
+  }
+
+  private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
+      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+    long nextPurgeTime = currentTime + purgeInterval;
+    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 165e61d..6d1bed5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -161,9 +161,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
 
-    // clean up the expired jobs if it is a queue.
     if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
-      purgeExpiredJobs(workflow, workflowCfg, workflowCtx);
+      Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
+      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
+      if (jobWithFinalStates.size() > 0) {
+        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
+        workflowCtx.removeJobStates(jobWithFinalStates);
+        workflowCtx.removeJobStartTime(jobWithFinalStates);
+      }
     }
 
     clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
@@ -523,66 +528,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
   }
 
-  /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
-   */
-  // TODO: run this in a separate thread.
-  // Get all jobConfigs & jobContext from ClusterCache.
-  private void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
-      Set<String> expiredJobs = TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(),
-          _manager.getHelixPropertyStore(), workflowConfig, workflowContext);
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
-          }
-          _rebalanceScheduler.removeScheduledRebalance(job);
-        }
-
-        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
-
-        if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
-              + " from the workflow " + workflow);
-        }
-        // remove job states in workflowContext.
-        workflowContext.removeJobStates(expiredJobs);
-        workflowContext.removeJobStartTime(expiredJobs);
-      }
-      workflowContext.setLastJobPurgeTime(currentTime);
-    }
-
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval);
-  }
-
-  private void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow, nextPurgeTime);
-    }
-  }
-
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 3918ab2..85c94c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -22,8 +22,10 @@ package org.apache.helix.integration.task;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixDataAccessor;
@@ -31,6 +33,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
@@ -42,6 +46,8 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
+import org.apache.helix.controller.stages.TaskSchedulingStage;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
@@ -295,12 +301,27 @@ public class TaskTestUtil {
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.PipelineType.name(), "TASK");
+
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool = new HashMap<>();
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>("ClusterName", AsyncWorkerType.TaskJobPurgeWorker.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            // TODO: retry when queue is empty and event.run() failed?
+            event.run();
+          }
+        };
+    worker.start();
+    asyncFIFOWorkerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), asyncFIFOWorkerPool);
 
     List<Stage> stages = new ArrayList<Stage>();
     stages.add(new ReadClusterDataStage());
     stages.add(new ResourceComputationStage());
     stages.add(new CurrentStateComputationStage());
     stages.add(new BestPossibleStateCalcStage());
+    stages.add(new TaskGarbageCollectionStage());
 
     for (Stage stage : stages) {
       runStage(event, stage);

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index c513c01..d5dea6c 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -83,8 +83,13 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
     _cache.setTaskCache(true);
     TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
     TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    Thread.sleep(500);
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
     Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
+    _cache.requireFullRefresh();
+    _cache.refresh(_manager.getHelixDataAccessor());
+    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    Thread.sleep(500);
     workflowContext = _driver.getWorkflowContext(queue);
     Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
         && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis());