You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/09/17 22:51:36 UTC

[4/4] helix git commit: Async purge job for task framework

Async purge job for task framework

There are some bottlenecks identified from previous profiling: https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Task+Framework+Performance+Profiling
This is the reason we need to rearchitect the task framework for Helix. For task framework performance improvement, we need to make purge job functionality asynchronized from the existing pipeline, which originally generates a heavy delay for pipeline execution.

This rb contains the change for moving purge jobs to be async executed. At same time, it brings the impact that job purge time may get a little bit delay because the asyc task submission.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1ad490ec
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1ad490ec
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1ad490ec

Branch: refs/heads/master
Commit: 1ad490ec72a4732417aa84925f39afc6ce43092e
Parents: 50c9aa3
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Jul 31 18:04:35 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:47:42 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  1 +
 .../controller/pipeline/AsyncWorkerType.java    |  3 +-
 .../rebalancer/util/RebalanceScheduler.java     | 19 +++++
 .../stages/TaskGarbageCollectionStage.java      | 33 ++++++++
 .../java/org/apache/helix/task/TaskUtil.java    | 83 +++++++++++++++++++-
 .../apache/helix/task/WorkflowRebalancer.java   | 67 ++--------------
 .../helix/integration/task/TaskTestUtil.java    | 21 +++++
 .../apache/helix/task/TestCleanExpiredJobs.java |  5 ++
 8 files changed, 171 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 48677d3..aae425f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -312,6 +312,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       rebalancePipeline.addStage(new MessageSelectionStage());
       rebalancePipeline.addStage(new MessageThrottleStage());
       rebalancePipeline.addStage(new TaskAssignmentStage());
+      rebalancePipeline.addStage(new TaskGarbageCollectionStage());
 
       // backward compatibility check
       Pipeline liveInstancePipeline = new Pipeline(pipelineName);

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index 443db31..f8d9967 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -29,5 +29,6 @@ package org.apache.helix.controller.pipeline;
 public enum AsyncWorkerType {
   TargetExternalViewCalcWorker,
   PersistAssignmentWorker,
-  ExternalViewComputeWorker
+  ExternalViewComputeWorker,
+  TaskJobPurgeWorker
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index 3fab8c4..ef2dc8d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -5,6 +5,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.IdealState;
 
+import org.apache.helix.model.ResourceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,4 +145,22 @@ public class RebalanceScheduler {
       LOG.warn("Can't find ideal state for {}", resource);
     }
   }
+
+  /**
+   * Trigger the controller to perform rebalance for a given resource.
+   * @param accessor Helix data accessor
+   * @param resource the name of the resource changed to triggering the execution
+   */
+  public static void invokeRebalanceForResourceConfig(HelixDataAccessor accessor, String resource) {
+    LOG.info("invoke rebalance for " + resource);
+    PropertyKey key = accessor.keyBuilder().resourceConfig(resource);
+    ResourceConfig cfg = accessor.getProperty(key);
+    if (cfg != null) {
+      if (!accessor.updateProperty(key, cfg)) {
+        LOG.warn("Failed to invoke rebalance on resource config {}", resource);
+      }
+    } else {
+      LOG.warn("Can't find resource config for {}", resource);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
new file mode 100644
index 0000000..e533c68
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -0,0 +1,33 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
+  private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
+
+  @Override
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.TaskJobPurgeWorker;
+  }
+
+  @Override
+  public void execute(ClusterEvent event) throws Exception {
+    ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    for (WorkflowConfig workflowConfig : clusterDataCache.getWorkflowConfigMap().values()) {
+      // clean up the expired jobs if it is a queue.
+      if (!workflowConfig.isTerminable() || workflowConfig.isJobQueue()) {
+        TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+            clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+            _rebalanceScheduler);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 496b351..ded3aa2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -35,6 +36,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -648,6 +650,10 @@ public class TaskUtil {
       for (String job : workflowConfig.getJobDag().getAllNodes()) {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
+        if (jobConfig == null) {
+          LOG.error(String.format("Job %s exists in JobDAG but JobConfig is missing!", job));
+          continue;
+        }
         long expiry = jobConfig.getExpiry();
         if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
           expiry = workflowConfig.getExpiry();
@@ -867,4 +873,79 @@ public class TaskUtil {
     TaskState jobState = workflowContext.getJobState(job);
     return (jobState != null && jobState != TaskState.NOT_STARTED);
   }
-}
\ No newline at end of file
+
+
+  /**
+   * Clean up all jobs that are COMPLETED and passes its expiry time.
+   * @param workflowConfig
+   * @param workflowContext
+   */
+
+  public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext, HelixManager manager,
+      RebalanceScheduler rebalanceScheduler) {
+    if (workflowContext == null) {
+      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
+      return;
+    }
+    long purgeInterval = workflowConfig.getJobPurgeInterval();
+    long currentTime = System.currentTimeMillis();
+    final Set<String> expiredJobs = Sets.newHashSet();
+    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+      expiredJobs.addAll(TaskUtil
+          .getExpiredJobs(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+              workflowConfig, workflowContext));
+      if (expiredJobs.isEmpty()) {
+        LOG.info("No job to purge for the queue " + workflow);
+      } else {
+        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+        Set<String> failedJobRemovals = new HashSet<>();
+        for (String job : expiredJobs) {
+          if (!TaskUtil
+              .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
+            failedJobRemovals.add(job);
+            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+          }
+          rebalanceScheduler.removeScheduledRebalance(job);
+        }
+
+        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+        // removal will be tried again at next purge
+        expiredJobs.removeAll(failedJobRemovals);
+
+        if (!TaskUtil
+            .removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+          LOG.warn(
+              "Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow "
+                  + workflow);
+        }
+
+        if (expiredJobs.size() > 0) {
+          // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+          // concurrent write issue. It is possible that jobs got purged but there is no event to
+          // trigger the pipeline to clean context.
+          HelixDataAccessor accessor = manager.getHelixDataAccessor();
+          List<String> resourceConfigs =
+              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+          if (resourceConfigs.size() > 0) {
+            RebalanceScheduler.invokeRebalanceForResourceConfig(manager.getHelixDataAccessor(),
+                resourceConfigs.get(0));
+          } else {
+            LOG.warn(
+                "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
+          }
+        }
+      }
+    }
+    setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
+  }
+
+  private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
+      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+    long nextPurgeTime = currentTime + purgeInterval;
+    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 165e61d..6d1bed5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -161,9 +161,14 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
 
-    // clean up the expired jobs if it is a queue.
     if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
-      purgeExpiredJobs(workflow, workflowCfg, workflowCtx);
+      Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
+      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
+      if (jobWithFinalStates.size() > 0) {
+        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
+        workflowCtx.removeJobStates(jobWithFinalStates);
+        workflowCtx.removeJobStartTime(jobWithFinalStates);
+      }
     }
 
     clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
@@ -523,66 +528,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
   }
 
-  /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
-   */
-  // TODO: run this in a separate thread.
-  // Get all jobConfigs & jobContext from ClusterCache.
-  private void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
-      Set<String> expiredJobs = TaskUtil.getExpiredJobs(_manager.getHelixDataAccessor(),
-          _manager.getHelixPropertyStore(), workflowConfig, workflowContext);
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(_manager.getHelixDataAccessor(), _manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
-          }
-          _rebalanceScheduler.removeScheduledRebalance(job);
-        }
-
-        // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
-
-        if (!TaskUtil.removeJobsFromDag(_manager.getHelixDataAccessor(), workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
-              + " from the workflow " + workflow);
-        }
-        // remove job states in workflowContext.
-        workflowContext.removeJobStates(expiredJobs);
-        workflowContext.removeJobStartTime(expiredJobs);
-      }
-      workflowContext.setLastJobPurgeTime(currentTime);
-    }
-
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval);
-  }
-
-  private void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      _rebalanceScheduler.scheduleRebalance(_manager, workflow, nextPurgeTime);
-    }
-  }
-
   @Override
   public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     // Nothing to do here with workflow resource.
     return currentIdealState;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 3918ab2..85c94c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -22,8 +22,10 @@ package org.apache.helix.integration.task;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixDataAccessor;
@@ -31,6 +33,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
@@ -42,6 +46,8 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
+import org.apache.helix.controller.stages.TaskSchedulingStage;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
@@ -295,12 +301,27 @@ public class TaskTestUtil {
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.PipelineType.name(), "TASK");
+
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool = new HashMap<>();
+    DedupEventProcessor<String, Runnable> worker =
+        new DedupEventProcessor<String, Runnable>("ClusterName", AsyncWorkerType.TaskJobPurgeWorker.name()) {
+          @Override
+          protected void handleEvent(Runnable event) {
+            // TODO: retry when queue is empty and event.run() failed?
+            event.run();
+          }
+        };
+    worker.start();
+    asyncFIFOWorkerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), asyncFIFOWorkerPool);
 
     List<Stage> stages = new ArrayList<Stage>();
     stages.add(new ReadClusterDataStage());
     stages.add(new ResourceComputationStage());
     stages.add(new CurrentStateComputationStage());
     stages.add(new BestPossibleStateCalcStage());
+    stages.add(new TaskGarbageCollectionStage());
 
     for (Stage stage : stages) {
       runStage(event, stage);

http://git-wip-us.apache.org/repos/asf/helix/blob/1ad490ec/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
index c513c01..d5dea6c 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestCleanExpiredJobs.java
@@ -83,8 +83,13 @@ public class TestCleanExpiredJobs extends TaskSynchronizedTestBase {
     _cache.setTaskCache(true);
     TaskUtil.setWorkflowContext(_manager, queue, workflowContext);
     TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    Thread.sleep(500);
     WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queue);
     Assert.assertEquals(workflowConfig.getJobDag().getAllNodes(), jobsLeft);
+    _cache.requireFullRefresh();
+    _cache.refresh(_manager.getHelixDataAccessor());
+    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    Thread.sleep(500);
     workflowContext = _driver.getWorkflowContext(queue);
     Assert.assertTrue(workflowContext.getLastJobPurgeTime() > startTime
         && workflowContext.getLastJobPurgeTime() < System.currentTimeMillis());