You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/11/13 22:23:46 UTC
[helix] branch master updated: Implement job context garbage
collection (#1520)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 56161ec Implement job context garbage collection (#1520)
56161ec is described below
commit 56161ecdbf8c1b55529e79ab720232a7b08328bc
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Nov 13 14:21:26 2020 -0800
Implement job context garbage collection (#1520)
Previously, the workflow context garbage collection has been implemented.
In this commit, new methods have been added to remove the job contexts that
do not have a corresponding job config.
---
.../helix/controller/stages/AttributeName.java | 2 +
.../stages/TaskGarbageCollectionStage.java | 13 +++++
.../main/java/org/apache/helix/task/TaskUtil.java | 21 ++++++++
.../helix/integration/task/TestStuckTaskQuota.java | 4 +-
.../task/TestWorkflowContextWithoutConfig.java | 56 ++++++++++++++++++++++
.../helix/rest/server/AbstractTestClass.java | 8 ++--
.../apache/helix/rest/server/TestJobAccessor.java | 6 +--
.../apache/helix/rest/server/TestTaskAccessor.java | 4 +-
.../helix/rest/server/TestWorkflowAccessor.java | 2 +-
9 files changed, 104 insertions(+), 12 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 9093166..d9ca40b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -48,5 +48,7 @@ public enum AttributeName {
// This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
TO_BE_PURGED_WORKFLOWS,
// This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+ JOBS_WITHOUT_CONFIG,
+ // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
TO_BE_PURGED_JOBS_MAP
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 0b0d7d4..553fa0a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -30,6 +30,7 @@ import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
@@ -62,10 +63,13 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
Map<String, Set<String>> expiredJobsMap = new HashMap<>();
Set<String> workflowsToBePurged = new HashSet<>();
+ Set<String> jobsWithoutConfig = new HashSet<>();
+
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
+ JobConfig jobConfig = dataProvider.getJobConfig(entry.getKey());
if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
.isJobQueue())) {
WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
@@ -93,12 +97,18 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
.equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
// Find workflows that need to be purged
workflowsToBePurged.add(entry.getKey());
+ } else if (jobConfig == null && entry.getValue() != null && entry.getValue().getId()
+ .equals(TaskUtil.TASK_CONTEXT_KW)) {
+ // Find jobs that need to be purged due to missing config
+ jobsWithoutConfig.add(entry.getKey());
}
}
event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
Collections.unmodifiableMap(expiredJobsMap));
event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
Collections.unmodifiableSet(workflowsToBePurged));
+ event.addAttribute(AttributeName.JOBS_WITHOUT_CONFIG.name(),
+ Collections.unmodifiableSet(jobsWithoutConfig));
super.process(event);
}
@@ -117,6 +127,8 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
Set<String> toBePurgedWorkflows =
event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
+ Set<String> jobsWithoutConfig =
+ event.getAttribute(AttributeName.JOBS_WITHOUT_CONFIG.name());
for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
try {
@@ -127,6 +139,7 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
}
TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
+ TaskUtil.jobGarbageCollection(jobsWithoutConfig, manager);
}
private static void scheduleNextJobPurge(String workflow, long nextPurgeTime,
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 890b151..dafd386 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -1143,6 +1143,27 @@ public class TaskUtil {
}
/**
+ * The function that removes IdealStates and job contexts of the jobs that need to be
+ * deleted.
+ * Warning: This method should only be used for the jobs that have job context and do not have job
+ * config.
+ * @param jobsWithoutConfig
+ * @param manager
+ */
+ public static void jobGarbageCollection(final Set<String> jobsWithoutConfig,
+ final HelixManager manager) {
+ for (String jobName : jobsWithoutConfig) {
+ LOG.warn(
+ "JobContext exists for job {}. However, job Config is missing! Deleting the JobContext and IdealState!!",
+ jobName);
+ if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+ jobName)) {
+ LOG.warn("Failed to clean up the job {}", jobName);
+ }
+ }
+ }
+
+ /**
* Get target thread pool size from InstanceConfig first; if InstanceConfig doesn't exist or the
* value is undefined, try ClusterConfig; if the value is undefined in ClusterConfig, fall back
* to the default value.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
index a118c6b..b8da096 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
@@ -149,8 +149,8 @@ public class TestStuckTaskQuota extends TaskTestBase {
.getAssignedParticipant(0)));
latch.countDown();
// Stop the workflow2 and workflow3
- _driver.waitToStop(workflowName2, 5000L);
- _driver.waitToStop(workflowName3, 5000L);
+ _driver.stop(workflowName2);
+ _driver.stop(workflowName3);
}
private void startParticipantAndRegisterNewMockTask(int participantIndex) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 8172ea9..25736ef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -20,7 +20,10 @@ package org.apache.helix.integration.task;
*/
import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.task.JobConfig;
@@ -38,6 +41,7 @@ import com.google.common.collect.ImmutableMap;
* Test workflow context will be deleted if workflow config has been removed.
*/
public class TestWorkflowContextWithoutConfig extends TaskTestBase {
+
@Test
public void testWorkflowContextGarbageCollection() throws Exception {
String workflowName = TestHelper.getTestMethodName();
@@ -100,6 +104,58 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
Assert.assertTrue(contextDeleted);
}
+ @Test
+ public void testJobContextGarbageCollection() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder builder = new Workflow.Builder(workflowName).setExpiry(1000000L);
+ JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setMaxAttemptsPerTask(1).setWorkflow(workflowName).setTimeoutPerTask(Long.MAX_VALUE)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+ builder.addJob("JOB0", jobBuilder1);
+
+ _driver.start(builder.build());
+
+ // Wait until workflow is COMPLETED
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+ JobContext jobContext =
+ accessor.getProperty(accessor.keyBuilder().jobContextZNode(workflowName, "JOB0"));
+
+ Assert.assertNotNull(jobContext);
+
+ _controller.syncStop();
+
+ // Write workflow context to ZooKeeper
+ jobContext.setName(TaskUtil.getNamespacedJobName(workflowName, "JOB1"));
+ accessor.setProperty(accessor.keyBuilder().jobContextZNode(workflowName, "JOB1"), jobContext);
+
+ // Verify context is written back to ZK.
+ jobContext =
+ accessor.getProperty(accessor.keyBuilder().jobContextZNode(workflowName, "JOB1"));
+ Assert.assertNotNull(jobContext);
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ // Create and start new workflow just to make sure controller is running and new workflow is
+ // scheduled successfully.
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+ Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+ _driver.start(builder2.build());
+ _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+ // Verify that JobContext will be deleted
+ boolean contextDeleted = TestHelper.verify(() -> {
+ JobContext jobCtx = _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "JOB1"));
+ return (jobCtx == null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(contextDeleted);
+ }
+
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 6d16e01..e937402 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -105,7 +105,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
// For a single-ZK/Helix environment
protected static final String ZK_ADDR = "localhost:2123";
protected static final String WORKFLOW_PREFIX = "Workflow_";
- protected static final String JOB_PREFIX = "Job_";
+ protected static final String JOB_PREFIX = "JOB";
protected static int NUM_PARTITIONS = 10;
protected static int NUM_REPLICA = 2;
protected static int MIN_ACTIVE_REPLICA = 3;
@@ -426,9 +426,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
}
workflows.put(WORKFLOW_PREFIX + i, workflow.build());
WorkflowContext workflowContext = TaskTestUtil
- .buildWorkflowContext(WORKFLOW_PREFIX + i, TaskState.IN_PROGRESS,
+ .buildWorkflowContext(WORKFLOW_PREFIX + i, TaskState.FAILED,
System.currentTimeMillis(), TaskState.COMPLETED, TaskState.COMPLETED,
- TaskState.IN_PROGRESS);
+ TaskState.FAILED);
_baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(),
TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE),
workflowContext.getRecord(), AccessOption.PERSISTENT);
@@ -451,7 +451,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
for (int i = 0; i < numJobs; i++) {
JobConfig.Builder job =
new JobConfig.Builder().setCommand("DummyCommand").setTargetResource("RESOURCE")
- .setWorkflow(workflowName);
+ .setWorkflow(workflowName).setJobId(workflowName + "_" + JOB_PREFIX + i);
jobCfgs.add(job);
JobContext jobContext = TaskTestUtil
.buildJobContext(System.currentTimeMillis(), System.currentTimeMillis() + 1,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
index ce8b711..b75a4f8 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
@@ -145,7 +145,7 @@ public class TestJobAccessor extends AbstractTestClass {
@Test(dependsOnMethods = "testCreateJob")
public void testGetAddJobContent() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
- String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/userContent";
+ String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/userContent";
// Empty user content
String body =
@@ -178,8 +178,8 @@ public class TestJobAccessor extends AbstractTestClass {
@Test(dependsOnMethods = "testGetAddJobContent")
public void testInvalidGetAndUpdateJobContentStore() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
- String validURI = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/userContent";
- String invalidURI1 = "clusters/" + CLUSTER_NAME + "/workflows/xxx/jobs/Job_0/userContent"; // workflow not exist
+ String validURI = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/userContent";
+ String invalidURI1 = "clusters/" + CLUSTER_NAME + "/workflows/xxx/jobs/JOB0/userContent"; // workflow not exist
String invalidURI2 = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/xxx/userContent"; // job not exist
Entity validEntity = Entity.entity("{\"k1\":\"v1\"}", MediaType.APPLICATION_JSON_TYPE);
Entity invalidEntity = Entity.entity("{\"k1\":{}}", MediaType.APPLICATION_JSON_TYPE); // not Map<String, String>
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java
index b815d1c..c4f5ed3 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java
@@ -38,8 +38,8 @@ public class TestTaskAccessor extends AbstractTestClass {
@Test
public void testGetAddTaskUserContent() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
- String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/tasks/0/userContent";
- String uriTaskDoesNotExist = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/tasks/xxx/userContent";
+ String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/tasks/0/userContent";
+ String uriTaskDoesNotExist = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/tasks/xxx/userContent";
// Empty user content
String body =
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index e08218c..e5364da 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -112,7 +112,7 @@ public class TestWorkflowAccessor extends AbstractTestClass {
Response.Status.OK.getStatusCode(), true);
JsonNode node = OBJECT_MAPPER.readTree(body);
Assert.assertEquals(node.get("STATE").textValue(),
- TaskState.IN_PROGRESS.name());
+ TaskState.FAILED.name());
System.out.println("End test :" + TestHelper.getTestMethodName());
}