You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/11/13 22:23:46 UTC

[helix] branch master updated: Implement job context garbage collection (#1520)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 56161ec  Implement job context garbage collection (#1520)
56161ec is described below

commit 56161ecdbf8c1b55529e79ab720232a7b08328bc
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Nov 13 14:21:26 2020 -0800

    Implement job context garbage collection (#1520)
    
    Previously, the workflow context garbage collection has been implemented.
    In this commit, new methods have been added to remove the job contexts that
    do not have a corresponding job config.
---
 .../helix/controller/stages/AttributeName.java     |  2 +
 .../stages/TaskGarbageCollectionStage.java         | 13 +++++
 .../main/java/org/apache/helix/task/TaskUtil.java  | 21 ++++++++
 .../helix/integration/task/TestStuckTaskQuota.java |  4 +-
 .../task/TestWorkflowContextWithoutConfig.java     | 56 ++++++++++++++++++++++
 .../helix/rest/server/AbstractTestClass.java       |  8 ++--
 .../apache/helix/rest/server/TestJobAccessor.java  |  6 +--
 .../apache/helix/rest/server/TestTaskAccessor.java |  4 +-
 .../helix/rest/server/TestWorkflowAccessor.java    |  2 +-
 9 files changed, 104 insertions(+), 12 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 9093166..d9ca40b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -48,5 +48,7 @@ public enum AttributeName {
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
   TO_BE_PURGED_WORKFLOWS,
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
+  JOBS_WITHOUT_CONFIG,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
   TO_BE_PURGED_JOBS_MAP
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 0b0d7d4..553fa0a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -30,6 +30,7 @@ import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -62,10 +63,13 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
 
     Map<String, Set<String>> expiredJobsMap = new HashMap<>();
     Set<String> workflowsToBePurged = new HashSet<>();
+    Set<String> jobsWithoutConfig = new HashSet<>();
+
     WorkflowControllerDataProvider dataProvider =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
     for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
       WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
+      JobConfig jobConfig = dataProvider.getJobConfig(entry.getKey());
       if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
           .isJobQueue())) {
         WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());
@@ -93,12 +97,18 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
           .equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
         // Find workflows that need to be purged
         workflowsToBePurged.add(entry.getKey());
+      } else if (jobConfig == null && entry.getValue() != null && entry.getValue().getId()
+          .equals(TaskUtil.TASK_CONTEXT_KW)) {
+        // Find jobs that need to be purged due to missing config
+        jobsWithoutConfig.add(entry.getKey());
       }
     }
     event.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(),
         Collections.unmodifiableMap(expiredJobsMap));
     event.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(),
         Collections.unmodifiableSet(workflowsToBePurged));
+    event.addAttribute(AttributeName.JOBS_WITHOUT_CONFIG.name(),
+        Collections.unmodifiableSet(jobsWithoutConfig));
 
     super.process(event);
   }
@@ -117,6 +127,8 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
         event.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
     Set<String> toBePurgedWorkflows =
         event.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
+    Set<String> jobsWithoutConfig =
+        event.getAttribute(AttributeName.JOBS_WITHOUT_CONFIG.name());
 
     for (Map.Entry<String, Set<String>> entry : expiredJobsMap.entrySet()) {
       try {
@@ -127,6 +139,7 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
     }
 
     TaskUtil.workflowGarbageCollection(toBePurgedWorkflows, manager);
+    TaskUtil.jobGarbageCollection(jobsWithoutConfig, manager);
   }
 
   private static void scheduleNextJobPurge(String workflow, long nextPurgeTime,
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 890b151..dafd386 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -1143,6 +1143,27 @@ public class TaskUtil {
   }
 
   /**
+   * The function that removes IdealStates and job contexts of the jobs that need to be
+   * deleted.
+   * Warning: This method should only be used for the jobs that have job context and do not have job
+   * config.
+   * @param jobsWithoutConfig
+   * @param manager
+   */
+  public static void jobGarbageCollection(final Set<String> jobsWithoutConfig,
+      final HelixManager manager) {
+    for (String jobName : jobsWithoutConfig) {
+      LOG.warn(
+          "JobContext exists for job {}. However, job Config is missing! Deleting the JobContext and IdealState!!",
+          jobName);
+      if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(),
+          jobName)) {
+        LOG.warn("Failed to clean up the job {}", jobName);
+      }
+    }
+  }
+
+  /**
    * Get target thread pool size from InstanceConfig first; if InstanceConfig doesn't exist or the
    * value is undefined, try ClusterConfig; if the value is undefined in ClusterConfig, fall back
    * to the default value.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
index a118c6b..b8da096 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
@@ -149,8 +149,8 @@ public class TestStuckTaskQuota extends TaskTestBase {
                 .getAssignedParticipant(0)));
     latch.countDown();
     // Stop the workflow2 and workflow3
-    _driver.waitToStop(workflowName2, 5000L);
-    _driver.waitToStop(workflowName3, 5000L);
+    _driver.stop(workflowName2);
+    _driver.stop(workflowName3);
   }
 
   private void startParticipantAndRegisterNewMockTask(int participantIndex) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 8172ea9..25736ef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -20,7 +20,10 @@ package org.apache.helix.integration.task;
  */
 
 import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.task.JobConfig;
@@ -38,6 +41,7 @@ import com.google.common.collect.ImmutableMap;
  * Test workflow context will be deleted if workflow config has been removed.
  */
 public class TestWorkflowContextWithoutConfig extends TaskTestBase {
+
   @Test
   public void testWorkflowContextGarbageCollection() throws Exception {
     String workflowName = TestHelper.getTestMethodName();
@@ -100,6 +104,58 @@ public class TestWorkflowContextWithoutConfig extends TaskTestBase {
     Assert.assertTrue(contextDeleted);
   }
 
+  @Test
+  public void testJobContextGarbageCollection() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName).setExpiry(1000000L);
+    JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1).setWorkflow(workflowName).setTimeoutPerTask(Long.MAX_VALUE)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+    builder.addJob("JOB0", jobBuilder1);
+
+    _driver.start(builder.build());
+
+    // Wait until workflow is COMPLETED
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+
+    JobContext jobContext =
+        accessor.getProperty(accessor.keyBuilder().jobContextZNode(workflowName, "JOB0"));
+
+    Assert.assertNotNull(jobContext);
+
+    _controller.syncStop();
+
+    // Write workflow context to ZooKeeper
+    jobContext.setName(TaskUtil.getNamespacedJobName(workflowName, "JOB1"));
+    accessor.setProperty(accessor.keyBuilder().jobContextZNode(workflowName, "JOB1"), jobContext);
+
+    // Verify context is written back to ZK.
+    jobContext =
+        accessor.getProperty(accessor.keyBuilder().jobContextZNode(workflowName, "JOB1"));
+    Assert.assertNotNull(jobContext);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Create and start new workflow just to make sure controller is running and new workflow is
+    // scheduled successfully.
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+    _driver.start(builder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    // Verify that JobContext will be deleted
+    boolean contextDeleted = TestHelper.verify(() -> {
+      JobContext jobCtx = _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "JOB1"));
+      return (jobCtx == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(contextDeleted);
+  }
+
   private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
     final long expiryTime = 5000L;
     Workflow.Builder builder = new Workflow.Builder(workflowName);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 6d16e01..e937402 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -105,7 +105,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   // For a single-ZK/Helix environment
   protected static final String ZK_ADDR = "localhost:2123";
   protected static final String WORKFLOW_PREFIX = "Workflow_";
-  protected static final String JOB_PREFIX = "Job_";
+  protected static final String JOB_PREFIX = "JOB";
   protected static int NUM_PARTITIONS = 10;
   protected static int NUM_REPLICA = 2;
   protected static int MIN_ACTIVE_REPLICA = 3;
@@ -426,9 +426,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       }
       workflows.put(WORKFLOW_PREFIX + i, workflow.build());
       WorkflowContext workflowContext = TaskTestUtil
-          .buildWorkflowContext(WORKFLOW_PREFIX + i, TaskState.IN_PROGRESS,
+          .buildWorkflowContext(WORKFLOW_PREFIX + i, TaskState.FAILED,
               System.currentTimeMillis(), TaskState.COMPLETED, TaskState.COMPLETED,
-              TaskState.IN_PROGRESS);
+              TaskState.FAILED);
       _baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(),
           TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE),
           workflowContext.getRecord(), AccessOption.PERSISTENT);
@@ -451,7 +451,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     for (int i = 0; i < numJobs; i++) {
       JobConfig.Builder job =
           new JobConfig.Builder().setCommand("DummyCommand").setTargetResource("RESOURCE")
-              .setWorkflow(workflowName);
+              .setWorkflow(workflowName).setJobId(workflowName + "_" + JOB_PREFIX + i);
       jobCfgs.add(job);
       JobContext jobContext = TaskTestUtil
           .buildJobContext(System.currentTimeMillis(), System.currentTimeMillis() + 1,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
index ce8b711..b75a4f8 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java
@@ -145,7 +145,7 @@ public class TestJobAccessor extends AbstractTestClass {
   @Test(dependsOnMethods = "testCreateJob")
   public void testGetAddJobContent() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
-    String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/userContent";
+    String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/userContent";
 
     // Empty user content
     String body =
@@ -178,8 +178,8 @@ public class TestJobAccessor extends AbstractTestClass {
   @Test(dependsOnMethods = "testGetAddJobContent")
   public void testInvalidGetAndUpdateJobContentStore() {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
-    String validURI = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/userContent";
-    String invalidURI1 = "clusters/" + CLUSTER_NAME + "/workflows/xxx/jobs/Job_0/userContent"; // workflow not exist
+    String validURI = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/userContent";
+    String invalidURI1 = "clusters/" + CLUSTER_NAME + "/workflows/xxx/jobs/JOB0/userContent"; // workflow not exist
     String invalidURI2 = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/xxx/userContent"; // job not exist
     Entity validEntity = Entity.entity("{\"k1\":\"v1\"}", MediaType.APPLICATION_JSON_TYPE);
     Entity invalidEntity = Entity.entity("{\"k1\":{}}", MediaType.APPLICATION_JSON_TYPE); // not Map<String, String>
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java
index b815d1c..c4f5ed3 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestTaskAccessor.java
@@ -38,8 +38,8 @@ public class TestTaskAccessor extends AbstractTestClass {
   @Test
   public void testGetAddTaskUserContent() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
-    String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/tasks/0/userContent";
-    String uriTaskDoesNotExist = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/tasks/xxx/userContent";
+    String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/tasks/0/userContent";
+    String uriTaskDoesNotExist = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/JOB0/tasks/xxx/userContent";
 
     // Empty user content
     String body =
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index e08218c..e5364da 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -112,7 +112,7 @@ public class TestWorkflowAccessor extends AbstractTestClass {
         Response.Status.OK.getStatusCode(), true);
     JsonNode node = OBJECT_MAPPER.readTree(body);
     Assert.assertEquals(node.get("STATE").textValue(),
-        TaskState.IN_PROGRESS.name());
+        TaskState.FAILED.name());
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }