You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/07/17 19:07:32 UTC
[helix] branch master updated: Revert "Recover Workflow Garbage
Collection Logic (#1076)" (#1155)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new fcfa37f Revert "Recover Workflow Garbage Collection Logic (#1076)" (#1155)
fcfa37f is described below
commit fcfa37f199d587f3043f4fb9d1d289412c2f6750
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Fri Jul 17 12:06:08 2020 -0700
Revert "Recover Workflow Garbage Collection Logic (#1076)" (#1155)
This reverts commit c7a97bdce66b21dab5522a4f100d08054ada99c2.
---
.../helix/controller/stages/AttributeName.java | 6 +-
.../stages/TaskGarbageCollectionStage.java | 93 ++---------
.../main/java/org/apache/helix/task/TaskUtil.java | 173 +++++++++------------
.../org/apache/helix/task/WorkflowDispatcher.java | 14 ++
.../helix/controller/stages/TestTaskStage.java | 89 ++---------
.../task/TestWorkflowContextWithoutConfig.java | 66 --------
.../java/org/apache/helix/task/TestTaskUtil.java | 95 -----------
7 files changed, 112 insertions(+), 424 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 557bb20..0af0ee5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -40,9 +40,5 @@ public enum AttributeName {
PipelineType,
LastRebalanceFinishTimeStamp,
ControllerDataProvider,
- STATEFUL_REBALANCER,
- // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
- TO_BE_PURGED_WORKFLOWS,
- // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
- TO_BE_PURGED_JOBS_MAP
+ STATEFUL_REBALANCER
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index f965a3e..3f8e744 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -1,9 +1,6 @@
package org.apache.helix.controller.stages;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixManager;
@@ -13,12 +10,9 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@@ -29,87 +23,34 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
}
@Override
- public void process(ClusterEvent event) throws Exception {
- // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
- // This is to avoid race conditions since the cache will be modified. After this work, then the
- // async work will happen.
+ public void execute(ClusterEvent event) {
+ WorkflowControllerDataProvider dataProvider =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
- if (manager == null) {
+
+ if (dataProvider == null || manager == null) {
LOG.warn(
- "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Map<String, Set<String>> expiredJobsMap = new HashMap<>();
- Set<String> workflowsToBePurged = new HashSet<>();
- WorkflowControllerDataProvider dataProvider =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
- for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
- WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
+ Set<WorkflowConfig> existingWorkflows =
+ new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+ for (WorkflowConfig workflowConfig : existingWorkflows) {
+ // clean up the expired jobs if it is a queue.
if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
.isJobQueue())) {
- WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
- long purgeInterval = workflowConfig.getJobPurgeInterval();
- long currentTime = System.currentTimeMillis();
- if (purgeInterval > 0
- && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
- // Find jobs that are ready to be purged
- Set<String> expiredJobs =
- TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
- if (!expiredJobs.isEmpty()) {
- expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
- }
- scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime, purgeInterval,
- _rebalanceScheduler, manager);
+ try {
+ TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+ dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+ _rebalanceScheduler);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
+ workflowConfig.getWorkflowId(), e.toString()));
}
- } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId()
- .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
- // Find workflows that need to be purged
- workflowsToBePurged.add(entry.getKey());
- }
- }
- event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
- Collections.unmodifiableMap(expiredJobsMap));
- event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
- Collections.unmodifiableSet(workflowsToBePurged));
-
- super.process(event);
- }
-
- @Override
- public void execute(ClusterEvent event) {
- HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
- if (manager == null) {
- LOG.warn(
- "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.",
- event.getEventId(), event.getEventType(), event.getClusterName());
- return;
- }
-
- Map<String, Set<String>> expiredJobsMap =
- event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
- Set<String> toBePurgedWorkflows =
- event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
-
- for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
- try {
- TaskUtil.purgeExpiredJobs(entry.getKey(), entry.getValue(), manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
}
}
- TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
- }
-
- private static void scheduleNextJobPurge(String workflow, long currentTime, long purgeInterval,
- RebalanceScheduler rebalanceScheduler, HelixManager manager) {
- long nextPurgeTime = currentTime + purgeInterval;
- long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
- if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
- rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
- }
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ec8d216..917a69b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -735,8 +735,20 @@ public class TaskUtil {
for (String job : workflowConfig.getJobDag().getAllNodes()) {
JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
- if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
+ if (jobConfig == null) {
+ LOG.error(String.format(
+ "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.",
+ job, workflowConfig.getWorkflowId()));
+ // Add the job name to expiredJobs so that purge operation will be tried again on this job
expiredJobs.add(job);
+ continue;
+ }
+ long expiry = jobConfig.getExpiry();
+ if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
+ if (jobContext.getFinishTime() != WorkflowContext.UNFINISHED
+ && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry) {
+ expiredJobs.add(job);
+ }
}
}
}
@@ -744,52 +756,6 @@ public class TaskUtil {
}
/**
- * Based on a workflow's config or context, create a set of jobs that are either expired, which
- * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs,
- * meaning that the job might have been deleted manually from the a job queue, or is left in the
- * DAG due to a failed clean-up attempt from last purge. The difference between this function and
- * getExpiredJobs() is that this function gets JobConfig and JobContext from a
- * WorkflowControllerDataProvider instead of Zk.
- * @param workflowControllerDataProvider
- * @param workflowConfig
- * @param workflowContext
- * @return
- */
- public static Set<String> getExpiredJobsFromCache(
- WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
- WorkflowContext workflowContext) {
- Set<String> expiredJobs = new HashSet<>();
- Map<String, TaskState> jobStates = workflowContext.getJobStates();
- for (String job : workflowConfig.getJobDag().getAllNodes()) {
- JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
- JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
- if (isJobExpired(job, jobConfig, jobContext, jobStates.get(job))) {
- expiredJobs.add(job);
- }
- }
- return expiredJobs;
- }
-
- /*
- * Checks if a job is expired and should be purged. This includes a special case when jobConfig
- * is null. That happens when a job might have been deleted manually from the a job queue, or is
- * left in the DAG due to a failed clean-up attempt from last purge.
- */
- private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext,
- TaskState jobState) {
- if (jobConfig == null) {
- LOG.warn(
- "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.",
- jobName);
- return true;
- }
- long expiry = jobConfig.getExpiry();
- return jobContext != null && jobState == TaskState.COMPLETED
- && jobContext.getFinishTime() != WorkflowContext.UNFINISHED
- && System.currentTimeMillis() >= jobContext.getFinishTime() + expiry;
- }
-
- /**
* Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name.
* @param accessor
* @param propertyStore
@@ -1011,71 +977,72 @@ public class TaskUtil {
}
/**
- * Clean up all jobs that are marked as expired.
+ * Clean up all jobs that are COMPLETED and passes its expiry time.
+ * @param workflowConfig
+ * @param workflowContext
*/
- public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
- HelixManager manager, RebalanceScheduler rebalanceScheduler) {
- Set<String> failedJobRemovals = new HashSet<>();
- for (String job : expiredJobs) {
- if (!TaskUtil
- .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) {
- failedJobRemovals.add(job);
- LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow);
- }
- rebalanceScheduler.removeScheduledRebalance(job);
+ public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
+ WorkflowContext workflowContext, HelixManager manager,
+ RebalanceScheduler rebalanceScheduler) {
+ if (workflowContext == null) {
+ LOG.warn(String.format("Workflow %s context does not exist!", workflow));
+ return;
}
+ long purgeInterval = workflowConfig.getJobPurgeInterval();
+ long currentTime = System.currentTimeMillis();
+ final Set<String> expiredJobs = Sets.newHashSet();
+ if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) {
+ expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
+ manager.getHelixPropertyStore(), workflowConfig, workflowContext));
+ if (expiredJobs.isEmpty()) {
+ LOG.info("No job to purge for the queue " + workflow);
+ } else {
+ LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+ Set<String> failedJobRemovals = new HashSet<>();
+ for (String job : expiredJobs) {
+ if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+ job)) {
+ failedJobRemovals.add(job);
+ LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow);
+ }
+ rebalanceScheduler.removeScheduledRebalance(job);
+ }
- // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
- // removal will be tried again at next purge
- expiredJobs.removeAll(failedJobRemovals);
+ // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the
+ // removal will be tried again at next purge
+ expiredJobs.removeAll(failedJobRemovals);
- if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) {
- LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs,
- workflow);
- }
+ if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs,
+ true)) {
+ LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs
+ + " from the workflow " + workflow);
+ }
- if (expiredJobs.size() > 0) {
- // Update workflow context will be in main pipeline not here. Otherwise, it will cause
- // concurrent write issue. It is possible that jobs got purged but there is no event to
- // trigger the pipeline to clean context.
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- List<String> resourceConfigs =
- accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
- if (resourceConfigs.size() > 0) {
- RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
- } else {
- LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!",
- expiredJobs);
+ if (expiredJobs.size() > 0) {
+ // Update workflow context will be in main pipeline not here. Otherwise, it will cause
+ // concurrent write issue. It is possible that jobs got purged but there is no event to
+ // trigger the pipeline to clean context.
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ List<String> resourceConfigs =
+ accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+ if (resourceConfigs.size() > 0) {
+ RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+ } else {
+ LOG.warn(
+ "No resource config to trigger rebalance for clean up contexts for" + expiredJobs);
+ }
+ }
}
}
+ setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
}
- /**
- * The function that removes IdealStates and workflow contexts of the workflows that need to be
- * deleted.
- * @param toBePurgedWorkflows
- * @param manager
- */
- public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows,
- final HelixManager manager) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
-
- for (String workflowName : toBePurgedWorkflows) {
- LOG.warn(
- "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
- workflowName);
-
- // TODO: We dont need this in the future when TF is not relying on IS/EV anymore.
- if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
- LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.",
- workflowName);
- continue;
- }
-
- if (!removeWorkflowContext(propertyStore, workflowName)) {
- LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName);
- }
+ private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
+ RebalanceScheduler rebalanceScheduler, HelixManager manager) {
+ long nextPurgeTime = currentTime + purgeInterval;
+ long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
+ if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
+ rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 8c49e1e..89e6d76 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -356,6 +356,20 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
TaskConstants.STATE_MODEL_NAME);
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+ // Set the job configuration
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ HelixProperty resourceConfig = new HelixProperty(jobResource);
+ resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+ Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+ if (taskConfigMap != null) {
+ for (TaskConfig taskConfig : taskConfigMap.values()) {
+ resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ }
+ }
+ accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+
// Push out new ideal state based on number of target partitions
IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
builder.setRebalancerMode(IdealState.RebalanceMode.TASK);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
index db27355..fefc737 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -1,16 +1,8 @@
package org.apache.helix.controller.stages;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.helix.AccessOption;
-import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
-import org.apache.helix.common.DedupEventProcessor;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
-import org.apache.helix.controller.pipeline.AsyncWorkerType;
-import org.apache.helix.task.TaskUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -74,7 +66,7 @@ public class TestTaskStage extends TaskTestBase {
TaskConstants.STATE_MODEL_NAME);
// Create the context
- WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+ WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -133,34 +125,15 @@ public class TestTaskStage extends TaskTestBase {
* async job purge will try to delete it again.
*/
@Test(dependsOnMethods = "testPersistContextData")
- public void testPartialDataPurge() throws Exception {
- DedupEventProcessor<String, Runnable> worker =
- new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
- AsyncWorkerType.TaskJobPurgeWorker.name()) {
- @Override
- protected void handleEvent(Runnable event) {
- event.run();
- }
- };
- worker.start();
- Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
- workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
- _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
-
+ public void testPartialDataPurge() {
// Manually delete JobConfig
deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
- // Manually refresh because there's no controller notify data change
- BaseControllerDataProvider dataProvider =
- _event.getAttribute(AttributeName.ControllerDataProvider.name());
- dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
- dataProvider.refresh(_manager.getHelixDataAccessor());
-
// Then purge jobs
TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
- garbageCollectionStage.process(_event);
+ garbageCollectionStage.execute(_event);
// Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
// IdealState check
@@ -169,41 +142,6 @@ public class TestTaskStage extends TaskTestBase {
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
}
- @Test(dependsOnMethods = "testPartialDataPurge")
- public void testWorkflowGarbageCollection() throws Exception {
- DedupEventProcessor<String, Runnable> worker =
- new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
- AsyncWorkerType.TaskJobPurgeWorker.name()) {
- @Override
- protected void handleEvent(Runnable event) {
- event.run();
- }
- };
- worker.start();
- Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
- workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
- _event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
-
- String zkPath =
- _manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
- _baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
-
- // Manually refresh because there's no controller notify data change
- BaseControllerDataProvider dataProvider =
- _event.getAttribute(AttributeName.ControllerDataProvider.name());
- dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
- dataProvider.refresh(_manager.getHelixDataAccessor());
-
- // Then garbage collect workflow
- TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
- garbageCollectionStage.process(_event);
-
- // Check that IS and contexts have been purged for the workflow
- checkForIdealStateAndContextRemoval(_testWorkflow);
-
- worker.shutdown();
- }
-
private void deleteJobConfigs(String workflowName, String jobName) {
String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
String newPath = _manager.getHelixDataAccessor().keyBuilder()
@@ -212,23 +150,16 @@ public class TestTaskStage extends TaskTestBase {
_baseAccessor.remove(newPath, AccessOption.PERSISTENT);
}
- private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
+ private void checkForIdealStateAndContextRemoval(String workflow, String job) {
+ // IdealState
+ Assert.assertFalse(
+ _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
+
// JobContexts in old ZNode path
String oldPath =
String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
-
- Assert.assertTrue(TestHelper.verify(
- () -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
- && !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
- .exists(newPath, AccessOption.PERSISTENT), 120000));
- }
-
- private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
- Assert.assertTrue(TestHelper.verify(() ->
- !_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
- && !_baseAccessor
- .exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
- 120000));
+ Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
+ Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 84df546..6c29f3a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -103,72 +103,6 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
Assert.assertTrue(workflowContextNotCreated);
}
- @Test
- public void testWorkflowContextGarbageCollection() throws Exception {
- String workflowName = TestHelper.getTestMethodName();
- Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
- _driver.start(builder1.build());
-
- // Wait until workflow is created and IN_PROGRESS state.
- _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
-
- // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
- // workflow
- Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
- Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
- Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
-
- String workflowContextPath =
- "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
-
- ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
- null, AccessOption.PERSISTENT);
- Assert.assertNotNull(record);
-
- // Wait until workflow is completed.
- _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
-
- // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
- // expired.
- boolean workflowExpired = TestHelper.verify(() -> {
- WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
- WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
- IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
- return (wCtx == null && wCfg == null && idealState == null);
- }, TestHelper.WAIT_DURATION);
- Assert.assertTrue(workflowExpired);
-
- _controller.syncStop();
-
- // Write workflow context to ZooKeeper
- _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
- AccessOption.PERSISTENT);
-
- // Verify context is written back to ZK.
- record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
- null, AccessOption.PERSISTENT);
- Assert.assertNotNull(record);
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- // Create and start new workflow just to make sure controller is running and new workflow is
- // scheduled successfully.
- String workflowName2 = TestHelper.getTestMethodName() + "_2";
- Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
- _driver.start(builder2.build());
- _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
-
- // Verify that WorkflowContext will be deleted
- boolean contextDeleted = TestHelper.verify(() -> {
- WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
- return (wCtx == null);
- }, TestHelper.WAIT_DURATION);
- Assert.assertTrue(contextDeleted);
- }
-
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
deleted file mode 100644
index 56e756d..0000000
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.integration.task.TaskTestBase;
-import org.apache.helix.integration.task.TaskTestUtil;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTaskUtil extends TaskTestBase {
-
- @Test
- public void testGetExpiredJobsFromCache() {
- String workflowName = "TEST_WORKFLOW";
- JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(workflowName);
-
- JobConfig.Builder jobBuilder_0 =
- new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1")
- .setExpiry(1L);
- JobConfig.Builder jobBuilder_1 =
- new JobConfig.Builder().setJobId("Job_1").setTargetResource("1").setCommand("1")
- .setExpiry(1L);
- JobConfig.Builder jobBuilder_2 =
- new JobConfig.Builder().setJobId("Job_2").setTargetResource("1").setCommand("1")
- .setExpiry(1L);
- JobConfig.Builder jobBuilder_3 =
- new JobConfig.Builder().setJobId("Job_3").setTargetResource("1").setCommand("1")
- .setExpiry(1L);
- Workflow jobQueue =
- queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
- .enqueueJob("Job_2", jobBuilder_2).enqueueJob("Job_3", jobBuilder_3).build();
-
- WorkflowContext workflowContext = mock(WorkflowContext.class);
- Map<String, TaskState> jobStates = new HashMap<>();
- jobStates.put(workflowName + "_Job_0", TaskState.COMPLETED);
- jobStates.put(workflowName + "_Job_1", TaskState.COMPLETED);
- jobStates.put(workflowName + "_Job_2", TaskState.FAILED);
- jobStates.put(workflowName + "_Job_3", TaskState.COMPLETED);
- when(workflowContext.getJobStates()).thenReturn(jobStates);
-
- JobConfig jobConfig = mock(JobConfig.class);
- WorkflowControllerDataProvider workflowControllerDataProvider =
- mock(WorkflowControllerDataProvider.class);
- when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1")).thenReturn(null);
- when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_1"))
- .thenReturn(jobConfig);
- when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_2"))
- .thenReturn(jobConfig);
- when(workflowControllerDataProvider.getJobConfig(workflowName + "_Job_3"))
- .thenReturn(jobConfig);
-
- JobContext jobContext = mock(JobContext.class);
- when(jobContext.getFinishTime()).thenReturn(System.currentTimeMillis());
-
- when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_1")).thenReturn(null);
- when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_2"))
- .thenReturn(jobContext);
- when(workflowControllerDataProvider.getJobContext(workflowName + "_Job_3"))
- .thenReturn(jobContext);
-
- Set<String> expectedJobs = new HashSet<>();
- expectedJobs.add(workflowName + "_Job_0");
- expectedJobs.add(workflowName + "_Job_3");
- Assert.assertEquals(TaskUtil
- .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
- workflowContext), expectedJobs);
- }
-}