You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/04 21:14:45 UTC
[helix] 09/10: Recover Workflow GC Logic (#1181)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3501111a3b9cae27bb7b875fadbf49406a4b6cd0
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Wed Jul 29 10:32:59 2020 -0700
Recover Workflow GC Logic (#1181)
Recover Workflow Garbage Collection Logic
Recover Workflow Garbage Collection Logic
---
.../helix/controller/stages/AttributeName.java | 6 +-
.../stages/TaskGarbageCollectionStage.java | 95 +++++++++--
.../main/java/org/apache/helix/task/TaskUtil.java | 173 ++++++++++++---------
.../org/apache/helix/task/WorkflowDispatcher.java | 14 --
.../helix/controller/stages/TestTaskStage.java | 89 +++++++++--
.../integration/task/TestJobQueueCleanUp.java | 19 ++-
.../task/TestWorkflowContextWithoutConfig.java | 66 ++++++++
.../java/org/apache/helix/task/TestTaskUtil.java | 95 +++++++++++
8 files changed, 438 insertions(+), 119 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 589988f..9a0bbb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,5 +40,9 @@ public enum AttributeName {
PipelineType,
LastRebalanceFinishTimeStamp,
ControllerDataProvider,
- STATEFUL_REBALANCER
+ STATEFUL_REBALANCER,
+ // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+ TO_BE_PURGED_WORKFLOWS,
+ // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+ TO_BE_PURGED_JOBS_MAP
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 7eb0db9..915cba1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -19,7 +19,10 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixManager;
@@ -29,9 +32,12 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@@ -42,34 +48,89 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
}
@Override
- public void execute(ClusterEvent event) {
- WorkflowControllerDataProvider dataProvider =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
+ public void process(ClusterEvent event) throws Exception {
+ // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
+ // This is to avoid race conditions since the cache will be modified. After this work, then the
+ // async work will happen.
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
- for (WorkflowConfig workflowConfig : existingWorkflows) {
- // clean up the expired jobs if it is a queue.
+ Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+ Set<String> workflowsToBePurged = new HashSet<>();
+ WorkflowControllerDataProvider dataProvider =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
+ WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
.isJobQueue())) {
- try {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
- dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
- workflowConfig.getWorkflowId(), e.toString()));
+ WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
+ if (workflowContext == null) {
+ continue;
}
+ long purgeInterval = workflowConfig.getJobPurgeInterval();
+ long currentTime = System.currentTimeMillis();
+ long nextPurgeTime = workflowContext.getLastJobPurgeTime() + purgeInterval;
+ if (purgeInterval > 0 && nextPurgeTime <= currentTime) {
+ nextPurgeTime = currentTime + purgeInterval;
+ // Find jobs that are ready to be purged
+ Set<String> expiredJobs =
+ TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
+ if (!expiredJobs.isEmpty()) {
+ expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
+ }
+ }
+ scheduleNextJobPurge(workflowConfig.getWorkflowId(), nextPurgeTime, _rebalanceScheduler,
+ manager);
+ } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
+ .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
+ // Find workflows that need to be purged
+ workflowsToBePurged.add(entry.getKey());
}
}
+ event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
+ Collections.unmodifiableMap(expiredJobsMap));
+ event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
+ Collections.unmodifiableSet(workflowsToBePurged));
+
+ super.process(event);
+ }
+ @Override
+ public void execute(ClusterEvent event) {
+ HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+ if (manager == null) {
+ LOG.warn(
+ "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
+ event.getEventId(), event.getEventType(), event.getClusterName());
+ return;
+ }
+
+ Map<String, Set<String>> expiredJobsMap =
+ event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
+ Set<String> toBePurgedWorkflows =
+ event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
+
+ for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
+ try {
+ TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager, _rebalanceScheduler);
+ } catch (Exception e) {
+ LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
+ }
+ }
+
+ TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
+ }
+
+ private static void scheduleNextJobPurge(String workflow, long nextPurgeTime,
+ RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+ long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+ if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+ rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+ }
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index e11f49e..e6e792f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,20 +735,8 @@ public class TaskUtil {
for (String job : workflowConfig.getJobDag().getAllNodes()) {
JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
- if (jobConfig == null) {
- LOG.error(String.format(
- "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
- job, workflowConfig.getWorkflowId()));
- // Add the job name to expiredJobs so that purge operation will be tried again on this job
+ if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
expiredJobs.add(job);
- continue;
- }
- long expiry = jobConfig.getExpiry();
- if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
- if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
- && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
- expiredJobs.add(job);
- }
}
}
}
@@ -756,6 +744,52 @@ public class TaskUtil {
}
/**
+ * Based on a workflow's config or context, create a set of jobs that are either expired, which
+ * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
+ * meaning that the job might have been deleted manually from the a job queue, or is left in the
+ * DAG due to a failed clean-up attempt from last purge. The difference between this function and
+ * getExpiredJobs() is that this function gets JobConfig and JobContext from a
+ * WorkflowControllerDataProvider instead of Zk.
+ * @param workflowControllerDataProvider
+ * @param workflowConfig
+ * @param workflowContext
+ * @return
+ */
+ public static Set<String> getExpiredJobsFromCache(
+ WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext) {
+ Set<String> expiredJobs = new HashSet<>();
+ Map<String, TaskState> jobStates = workflowContext.getJobStates();
+ for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+ JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
+ if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+ expiredJobs.add(job);
+ }
+ }
+ return expiredJobs;
+ }
+
+ /*
+ * Checks if a job is expired and should be purged. This includes a special case when jobConfig
+ * is null. That happens when a job might have been deleted manually from the a job queue, or is
+ * left in the DAG due to a failed clean-up attempt from last purge.
+ */
+ private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
+ TaskState jobState) {
+ if (jobConfig == null) {
+ LOG.warn(
+ "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
+ jobName);
+ return true;
+ }
+ long expiry = jobConfig.getExpiry();
+ return jobContext != null && jobState == TaskState.COMPLETED
+ && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+ && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
+ }
+
+ /**
* Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
* @param accessor
* @param propertyStore
@@ -977,72 +1011,71 @@ public class TaskUtil {
}
/**
- * Clean up all jobs that are COMPLETED and passes its expiry time.
- * @param workflowConfig
- * @param workflowContext
+ * Clean up all jobs that are marked as expired.
*/
- public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
- WorkflowContext workflowContext, HelixManager manager,
- RebalanceScheduler rebalanceScheduler) {
- if (workflowContext == null) {
- LOG.warn(String.format("Workflow %s context does not exist!", workflow));
- return;
+ public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+ HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+ Set<String> failedJobRemovals = new HashSet<>();
+ for (String job : expiredJobs) {
+ if (!TaskUtil
+ .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
+ failedJobRemovals.add(job);
+ LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
+ }
+ rebalanceScheduler.removeScheduledRebalance(job);
}
- long purgeInterval = workflowConfig.getJobPurgeInterval();
- long currentTime = System.currentTimeMillis();
- final Set<String> expiredJobs = Sets.newHashSet();
- if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
- expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
- manager.getHelixPropertyStore(), workflowConfig, workflowContext));
- if (expiredJobs.isEmpty()) {
- LOG.info("No job to purge for the queue " + workflow);
- } else {
- LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
- Set<String> failedJobRemovals = new HashSet<>();
- for (String job : expiredJobs) {
- if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
- job)) {
- failedJobRemovals.add(job);
- LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
- }
- rebalanceScheduler.removeScheduledRebalance(job);
- }
- // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
- // removal will be tried again at next purge
- expiredJobs.removeAll(failedJobRemovals);
+ // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+ // removal will be tried again at next purge
+ expiredJobs.removeAll(failedJobRemovals);
- if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
- true)) {
- LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
- + " from the workflow " + workflow);
- }
+ if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
+ LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
+ workflow);
+ }
- if (expiredJobs.size() > 0) {
- // Update workflow context will be in main pipeline not here. Otherwise, it will cause
- // concurrent write issue. It is possible that jobs got purged but there is no event to
- // trigger the pipeline to clean context.
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- List<String> resourceConfigs =
- accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
- if (resourceConfigs.size() > 0) {
- RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
- } else {
- LOG.warn(
- "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
- }
- }
+ if (expiredJobs.size() > 0) {
+ // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+ // concurrent write issue. It is possible that jobs got purged but there is no event to
+ // trigger the pipeline to clean context.
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ List<String> resourceConfigs =
+ accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+ if (resourceConfigs.size() > 0) {
+ RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+ } else {
+ LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
+ expiredJobs);
}
}
- setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
}
- private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
- RebalanceScheduler rebalanceScheduler, HelixManager manager) {
- long nextPurgeTime = currentTime + purgeInterval;
- long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
- if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
- rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+ /**
+ * The function that removes IdealStates and workflow contexts of the workflows that need to be
+ * deleted.
+ * @param toBePurgedWorkflows
+ * @param manager
+ */
+ public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
+ final HelixManager manager) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
+
+ for (String workflowName : toBePurgedWorkflows) {
+ LOG.warn(
+ "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
+ workflowName);
+
+ // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
+ if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
+ LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
+ workflowName);
+ continue;
+ }
+
+ if (!removeWorkflowContext(propertyStore, workflowName)) {
+ LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
+ }
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 53be558..4c9bd18 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -356,20 +356,6 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
TaskConstants.STATE_MODEL_NAME);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-
- // Set the job configuration
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- HelixProperty resourceConfig = new HelixProperty(jobResource);
- resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
- Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
- if (taskConfigMap != null) {
- for (TaskConfig taskConfig : taskConfigMap.values()) {
- resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
- }
- }
- accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
-
// Push out new ideal state based on number of target partitions
IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index 99e227c..43fda00 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -19,9 +19,17 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -85,7 +93,7 @@ public class TestTaskStage extends TaskTestBase {
TaskConstants.STATE_MODEL_NAME);
// Create the context
- WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
+ WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -144,15 +152,34 @@ public class TestTaskStage extends TaskTestBase {
* async job purge will try to delete it again.
*/
@Test(dependsOnMethods = "testPersistContextData")
- public void testPartialDataPurge() {
+ public void testPartialDataPurge() throws Exception {
+ DedupEventProcessor<String, Runnable> worker =
+ new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+ AsyncWorkerType.TaskJobPurgeWorker.name()) {
+ @Override
+ protected void handleEvent(Runnable event) {
+ event.run();
+ }
+ };
+ worker.start();
+ Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
+ workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+ _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
// Manually delete JobConfig
deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
+ // Manually refresh because there's no controller notify data change
+ BaseControllerDataProvider dataProvider =
+ _event.getAttribute(AttributeName.ControllerDataProvider.name());
+ dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+ dataProvider.refresh(_manager.getHelixDataAccessor());
+
// Then purge jobs
TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
- garbageCollectionStage.execute(_event);
+ garbageCollectionStage.process(_event);
// Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
// IdealState check
@@ -161,6 +188,41 @@ public class TestTaskStage extends TaskTestBase {
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
}
+ @Test(dependsOnMethods = "testPartialDataPurge")
+ public void testWorkflowGarbageCollection() throws Exception {
+ DedupEventProcessor<String, Runnable> worker =
+ new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
+ AsyncWorkerType.TaskJobPurgeWorker.name()) {
+ @Override
+ protected void handleEvent(Runnable event) {
+ event.run();
+ }
+ };
+ worker.start();
+ Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
+ workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
+ _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
+
+ String zkPath =
+ _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
+ _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
+
+ // Manually refresh because there's no controller notify data change
+ BaseControllerDataProvider dataProvider =
+ _event.getAttribute(AttributeName.ControllerDataProvider.name());
+ dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+ dataProvider.refresh(_manager.getHelixDataAccessor());
+
+ // Then garbage collect workflow
+ TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
+ garbageCollectionStage.process(_event);
+
+ // Check that IS and contexts have been purged for the workflow
+ checkForIdealStateAndContextRemoval(_testWorkflow);
+
+ worker.shutdown();
+ }
+
private void deleteJobConfigs(String workflowName, String jobName) {
String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -169,16 +231,23 @@ public class TestTaskStage extends TaskTestBase {
_baseAccessor.remove(newPath, AccessOption.PERSISTENT);
}
- private void checkForIdealStateAndContextRemoval(String workflow, String job) {
- // IdealState
- Assert.assertFalse(
- _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
-
+ private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
// JobContexts in old ZNode path
String oldPath =
String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
- Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
- Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
+
+ Assert.assertTrue(TestHelper.verify(
+ () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
+ && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
+ .exists(newPath, AccessOption.PERSISTENT), 120000));
+ }
+
+ private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
+ Assert.assertTrue(TestHelper.verify(() ->
+ !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
+ && !_baseAccessor
+ .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
+ 120000));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index daa9b4b..ba4fb44 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -80,7 +80,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
}
@Test
- public void testJobQueueAutoCleanUp() throws InterruptedException {
+ public void testJobQueueAutoCleanUp() throws Exception {
int capacity = 10;
String queueName = TestHelper.getTestMethodName();
JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName, capacity);
@@ -105,14 +105,19 @@ public class TestJobQueueCleanUp extends TaskTestBase {
}
_driver.start(builder.build());
_driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + (capacity - 1)), TaskState.FAILED);
- Thread.sleep(2000);
- WorkflowConfig config = _driver.getWorkflowConfig(queueName);
- Assert.assertEquals(config.getJobDag().getAllNodes(), remainJobs);
+ Assert
+ .assertTrue(TestHelper.verify(() -> {
+ WorkflowConfig config = _driver.getWorkflowConfig(queueName);
+ System.out.println("|Current time: " + System.currentTimeMillis() +" **TEST: " + config.getJobDag().getAllNodes());
+ return config.getJobDag().getAllNodes().equals(remainJobs);
+ }, TestHelper.WAIT_DURATION));
- WorkflowContext context = _driver.getWorkflowContext(queueName);
- Assert.assertEquals(context.getJobStates().keySet(), remainJobs);
- Assert.assertTrue(remainJobs.containsAll(context.getJobStartTimes().keySet()));
+ Assert.assertTrue(TestHelper.verify(() -> {
+ WorkflowContext context = _driver.getWorkflowContext(queueName);
+ return context.getJobStates().keySet().equals(remainJobs) && remainJobs
+ .containsAll(context.getJobStartTimes().keySet());
+ }, TestHelper.WAIT_DURATION));
for (String job : deletedJobs) {
JobConfig cfg = _driver.getJobConfig(job);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 70dd33f..31010cf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,6 +103,72 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
Assert.assertTrue(workflowContextNotCreated);
}
+ @Test
+ public void testWorkflowContextGarbageCollection() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
+ _driver.start(builder1.build());
+
+ // Wait until workflow is created and IN_PROGRESS state.
+ _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+ // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
+ // workflow
+ Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
+ Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
+ Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
+
+ String workflowContextPath =
+ "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
+
+ ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+ null, AccessOption.PERSISTENT);
+ Assert.assertNotNull(record);
+
+ // Wait until workflow is completed.
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
+ // expired.
+ boolean workflowExpired = TestHelper.verify(() -> {
+ WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+ WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
+ IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
+ return (wCtx == null && wCfg == null && idealState == null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(workflowExpired);
+
+ _controller.syncStop();
+
+ // Write workflow context to ZooKeeper
+ _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
+ AccessOption.PERSISTENT);
+
+ // Verify context is written back to ZK.
+ record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+ null, AccessOption.PERSISTENT);
+ Assert.assertNotNull(record);
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // Create and start new workflow just to make sure controller is running and new workflow is
+ // scheduled successfully.
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+ Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+ _driver.start(builder2.build());
+ _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+ // Verify that WorkflowContext will be deleted
+ boolean contextDeleted = TestHelper.verify(() -> {
+ WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+ return (wCtx == null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(contextDeleted);
+ }
+
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..56e756d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,95 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTaskUtil extends TaskTestBase {
+
+ @Test
+ public void testGetExpiredJobsFromCache() {
+ String workflowName = "TEST_WORKFLOW";
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
+
+ JobConfig.Builder jobBuilder_0 =
+ new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ JobConfig.Builder jobBuilder_1 =
+ new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ JobConfig.Builder jobBuilder_2 =
+ new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ JobConfig.Builder jobBuilder_3 =
+ new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
+ .setExpiry(1L);
+ Workflow jobQueue =
+ queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
+ .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build();
+
+ WorkflowContext workflowContext = mock(WorkflowContext.class);
+ Map<String, TaskState> jobStates = new HashMap<>();
+ jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
+ jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
+ jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
+ jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
+ when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+ JobConfig jobConfig = mock(JobConfig.class);
+ WorkflowControllerDataProvider workflowControllerDataProvider =
+ mock(WorkflowControllerDataProvider.class);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
+ .thenReturn(jobConfig);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
+ .thenReturn(jobConfig);
+ when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
+ .thenReturn(jobConfig);
+
+ JobContext jobContext = mock(JobContext.class);
+ when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
+
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
+ .thenReturn(jobContext);
+ when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
+ .thenReturn(jobContext);
+
+ Set<String> expectedJobs = new HashSet<>();
+ expectedJobs.add(workflowName + "_Job_0");
+ expectedJobs.add(workflowName + "_Job_3");
+ Assert.assertEquals(TaskUtil
+ .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
+ workflowContext), expectedJobs);
+ }
+}