You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2021/09/17 19:56:29 UTC

[helix] branch master updated: Fix adding a task to a job after deleting old tasks (#1875)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e07a02  Fix adding a task to a job after deleting old tasks (#1875)
6e07a02 is described below

commit 6e07a024796ebc1a86c10fa10c7bb799a716b393
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Sep 17 12:56:20 2021 -0700

    Fix adding a task to a job after deleting old tasks (#1875)
    
    In this commit, the issue of dynamically adding a task to a job
    in which some of its tasks have been deleted before is being
    addressed.
---
 .../task/GenericTaskAssignmentCalculator.java      |  11 +--
 .../helix/task/TaskAssignmentCalculator.java       |  36 ++++++++
 .../ThreadCountBasedTaskAssignmentCalculator.java  |  12 +--
 .../helix/integration/task/TestAddDeleteTask.java  | 100 +++++++++++++++++++++
 4 files changed, 139 insertions(+), 20 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 987f2a4..0f2ad13 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -53,16 +53,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Map<String, IdealState> idealStateMap) {
-    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
-    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
-    for (TaskConfig taskCfg : taskMap.values()) {
-      String taskId = taskCfg.getId();
-      int nextPartition = jobCtx.getPartitionSet().size();
-      if (!taskIdMap.containsKey(taskId)) {
-        jobCtx.setTaskIdForPartition(nextPartition, taskId);
-      }
-    }
-    return jobCtx.getPartitionSet();
+    return getAllTaskPartitionsDefault(jobCfg, jobCtx);
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index ac87af2..e7491b7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -121,4 +122,39 @@ public abstract class TaskAssignmentCalculator {
     }
     return deletedPartitions;
   }
+
+  /**
+   * Get all the partitions/tasks that belong to the non-targeted job.
+   * @param jobCfg the task configuration
+   * @param jobCtx the task context
+   * @return set of partition numbers
+   */
+  public Set<Integer> getAllTaskPartitionsDefault(JobConfig jobCfg, JobContext jobCtx) {
+    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+    // Check if a gap exists in the context due to previouslys
+    // removed tasks. If yes, the missing pIDs should be considered
+    // for newly added tasks
+    Set<Integer> existingPartitions = jobCtx.getPartitionSet();
+    Set<Integer> missingPartitions = new HashSet<>();
+    if (existingPartitions.size() != 0) {
+      for (int pId = 0; pId < Collections.max(existingPartitions); pId++) {
+        if (!existingPartitions.contains(pId)) {
+          missingPartitions.add(pId);
+        }
+      }
+    }
+    for (TaskConfig taskCfg : taskMap.values()) {
+      String taskId = taskCfg.getId();
+      if (!taskIdMap.containsKey(taskId)) {
+        int nextPartition = jobCtx.getPartitionSet().size();
+        if (missingPartitions.size()!=0) {
+          nextPartition = missingPartitions.iterator().next();
+          missingPartitions.remove(nextPartition);
+        }
+        jobCtx.setTaskIdForPartition(nextPartition, taskId);
+      }
+    }
+    return jobCtx.getPartitionSet();
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index b3d85e1..f8a4106 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -63,16 +64,7 @@ public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalc
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Map<String, IdealState> idealStateMap) {
-    Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
-    Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
-    for (TaskConfig taskCfg : taskMap.values()) {
-      String taskId = taskCfg.getId();
-      int nextPartition = jobCtx.getPartitionSet().size();
-      if (!taskIdMap.containsKey(taskId)) {
-        jobCtx.setTaskIdForPartition(nextPartition, taskId);
-      }
-    }
-    return jobCtx.getPartitionSet();
+    return getAllTaskPartitionsDefault(jobCfg, jobCtx);
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
index d99707f..e432a07 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
@@ -610,6 +610,106 @@ public class TestAddDeleteTask extends TaskTestBase {
   }
 
   @Test(dependsOnMethods = "testDeleteTaskAndJobCompleted")
+  public void testDeleteMiddleTaskAndAdd() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+
+    JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+        .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "20000"));
+
+    Workflow.Builder workflowBuilder1 =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+    _driver.start(workflowBuilder1.build());
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Wait until initial task goes to RUNNING state
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      if (jobContext == null) {
+        return false;
+      }
+      TaskPartitionState state = jobContext.getPartitionState(0);
+      if (state == null) {
+        return false;
+      }
+      return (state == TaskPartitionState.RUNNING);
+    }, TestHelper.WAIT_DURATION));
+
+    // Only one task (initial task) should be included in the job
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (jobContext.getPartitionSet().size() == 1);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add new tasks
+    Map<String, String> taskConfig1 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    Map<String, String> taskConfig2 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    Map<String, String> taskConfig3 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    Map<String, String> taskConfig4 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    TaskConfig task1 = new TaskConfig(null, taskConfig1, null, null);
+    TaskConfig task2 = new TaskConfig(null, taskConfig2, null, null);
+    TaskConfig task3 = new TaskConfig(null, taskConfig3, null, null);
+    TaskConfig task4 = new TaskConfig(null, taskConfig4, null, null);
+
+    _driver.addTask(workflowName, jobName, task1);
+    _driver.addTask(workflowName, jobName, task2);
+    _driver.addTask(workflowName, jobName, task3);
+    _driver.addTask(workflowName, jobName, task4);
+
+    // 5 tasks should be included in the job
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (jobContext.getPartitionSet().size() == 5);
+    }, TestHelper.WAIT_DURATION));
+
+    // All Task should be in RUNNING
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      int runningTasks = 0;
+      for (Integer pId: jobContext.getPartitionSet()) {
+        if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING) {
+          runningTasks++;
+        }
+      }
+      return (runningTasks == 5);
+    }, TestHelper.WAIT_DURATION));
+
+    _driver.deleteTask(workflowName, jobName, task3.getId());
+
+    // Since one of the tasks had been delete, we should expect 4 tasks in the context
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (jobContext.getPartitionSet().size() == 4);
+    }, TestHelper.WAIT_DURATION));
+
+    // Add new tasks and make sure the task is being added to context
+    Map<String, String> taskConfig5 =
+        new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+    TaskConfig task5 = new TaskConfig(null, taskConfig5, null, null);
+    _driver.addTask(workflowName, jobName, task5);
+
+    Assert.assertTrue(TestHelper.verify(() -> {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+      return (jobContext.getPartitionSet().size() == 5);
+    }, TestHelper.WAIT_DURATION));
+    _driver.stop(workflowName);
+  }
+
+  @Test(dependsOnMethods = "testDeleteTaskAndJobCompleted")
   public void testPartitionDropTargetedJob() throws Exception {
     String workflowName = TestHelper.getTestMethodName();
     String jobName = "JOB0";